PHOENIX-4544 Update statistics inconsistent behavior

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/62f3493c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/62f3493c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/62f3493c

Branch: refs/heads/4.14-cdh5.11
Commit: 62f3493c39e74395c32fd1ba5a73e67be325d25f
Parents: 3a1e943
Author: Ankit Singhal <ankitsingha...@gmail.com>
Authored: Thu Jun 7 19:23:56 2018 +0100
Committer: Pedro Boado <pbo...@apache.org>
Committed: Wed Oct 17 20:08:43 2018 +0100

----------------------------------------------------------------------
 .../StatisticsCollectionRunTrackerIT.java       | 32 ++++++++------
 .../UngroupedAggregateRegionObserver.java       |  4 +-
 .../apache/phoenix/schema/MetaDataClient.java   | 10 ++++-
 .../stats/StatisticsCollectionRunTracker.java   | 45 +++++++++++++++++---
 .../java/org/apache/phoenix/util/ByteUtil.java  | 16 ++++++-
 5 files changed, 85 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/62f3493c/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java
index cf475f9..a643383 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertTrue;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -61,12 +63,15 @@ public class StatisticsCollectionRunTrackerIT extends 
ParallelStatsEnabledIT {
         StatisticsCollectionRunTracker tracker =
                 StatisticsCollectionRunTracker.getInstance(new 
Configuration());
         // assert that the region wasn't added to the tracker
-        assertTrue(tracker.addUpdateStatsCommandRegion(regionInfo));
+        assertTrue(tracker.addUpdateStatsCommandRegion(regionInfo, new 
HashSet<byte[]>(Arrays.asList(Bytes.toBytes("0")))));
+        assertTrue(tracker.addUpdateStatsCommandRegion(regionInfo, new 
HashSet<byte[]>(Arrays.asList(Bytes.toBytes("L#0")))));
         // assert that removing the region from the tracker works
-        assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo));
+        assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo, new 
HashSet<byte[]>(Arrays.asList(Bytes.toBytes("0")))));
+        assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo, new 
HashSet<byte[]>(Arrays.asList(Bytes.toBytes("L#0")))));
         runUpdateStats(tableName);
         // assert that after update stats is complete, tracker isn't tracking 
the region any more
-        assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo));
+        assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo, new 
HashSet<byte[]>(Arrays.asList(Bytes.toBytes("0")))));
+        assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo, new 
HashSet<byte[]>(Arrays.asList(Bytes.toBytes("L#0")))));;
     }
     
     @Test
@@ -103,25 +108,27 @@ public class StatisticsCollectionRunTrackerIT extends 
ParallelStatsEnabledIT {
         HRegionInfo regionInfo = createTableAndGetRegion(tableName);
         // simulate stats collection via major compaction by marking the 
region as compacting in the tracker
         markRegionAsCompacting(regionInfo);
-        Assert.assertEquals("Row count didn't match", 
COMPACTION_UPDATE_STATS_ROW_COUNT, runUpdateStats(tableName));
-        StatisticsCollectionRunTracker tracker =
+        // there will be no update for local index and a table , so checking 2 
* COMPACTION_UPDATE_STATS_ROW_COUNT
+        Assert.assertEquals("Row count didn't match", 
COMPACTION_UPDATE_STATS_ROW_COUNT * 2, runUpdateStats(tableName));
                 StatisticsCollectionRunTracker.getInstance(new 
Configuration());
         // assert that the tracker state was cleared.
-        assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo));
+        HashSet<byte[]> familyMap = new 
HashSet<byte[]>(Arrays.asList(Bytes.toBytes("0")));
+        assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo, 
familyMap));
     }
     
     @Test
     public void testUpdateStatsPreventsAnotherUpdateStatsFromRunning() throws 
Exception {
         String tableName = fullTableName;
         HRegionInfo regionInfo = createTableAndGetRegion(tableName);
-        markRunningUpdateStats(regionInfo);
-        Assert.assertEquals("Row count didn't match", 
CONCURRENT_UPDATE_STATS_ROW_COUNT,
-            runUpdateStats(tableName));
+        HashSet<byte[]> familyMap = new 
HashSet<byte[]>(Arrays.asList(Bytes.toBytes("0")));
+        markRunningUpdateStats(regionInfo,familyMap);
+        //there will be no update for a table but local index should succeed, 
so checking 2 * COMPACTION_UPDATE_STATS_ROW_COUNT
+        assertTrue("Local index stats are not updated!", 
CONCURRENT_UPDATE_STATS_ROW_COUNT < runUpdateStats(tableName));
         
         // assert that running the concurrent and race-losing update stats 
didn't clear the region
         // from the tracker. If the method returned true it means the tracker 
was still tracking
         // the region. Slightly counter-intuitive, yes.
-        assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo));
+        
assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo,familyMap));
     }
     
     private void markRegionAsCompacting(HRegionInfo regionInfo) {
@@ -130,10 +137,10 @@ public class StatisticsCollectionRunTrackerIT extends 
ParallelStatsEnabledIT {
         tracker.addCompactingRegion(regionInfo);
     }
 
-    private void markRunningUpdateStats(HRegionInfo regionInfo) {
+    private void markRunningUpdateStats(HRegionInfo regionInfo, 
HashSet<byte[]> familyMap) {
         StatisticsCollectionRunTracker tracker =
                 StatisticsCollectionRunTracker.getInstance(new 
Configuration());
-        tracker.addUpdateStatsCommandRegion(regionInfo);
+        tracker.addUpdateStatsCommandRegion(regionInfo, familyMap);
     }
 
     private HRegionInfo createTableAndGetRegion(String tableName) throws 
Exception {
@@ -141,6 +148,7 @@ public class StatisticsCollectionRunTrackerIT extends 
ParallelStatsEnabledIT {
         String ddl = "CREATE TABLE " + tableName + " (PK1 VARCHAR PRIMARY KEY, 
KV1 VARCHAR)";
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.createStatement().execute(ddl);
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + 
"_IDX ON " + tableName + "(KV1)");
             PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
             try (HBaseAdmin admin = phxConn.getQueryServices().getAdmin()) {
                 List<HRegionInfo> tableRegions = 
admin.getTableRegions(tableNameBytes);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/62f3493c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 14213f4..c325d70 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -1129,7 +1129,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         long rowCount = 0; // in case of async, we report 0 as number of rows 
updated
         StatisticsCollectionRunTracker statsRunTracker =
                 StatisticsCollectionRunTracker.getInstance(config);
-        boolean runUpdateStats = 
statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo());
+        boolean runUpdateStats = 
statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo(),scan.getFamilyMap().keySet());
         if (runUpdateStats) {
             if (!async) {
                 rowCount = callable.call();
@@ -1248,7 +1248,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                     }
                 } finally {
                     try {
-                        
StatisticsCollectionRunTracker.getInstance(config).removeUpdateStatsCommandRegion(region.getRegionInfo());
+                        
StatisticsCollectionRunTracker.getInstance(config).removeUpdateStatsCommandRegion(region.getRegionInfo(),
 scan.getFamilyMap().keySet());
                         statsCollector.close();
                     } finally {
                         try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/62f3493c/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 9979ae7..21391f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1168,7 +1168,15 @@ public class MetaDataClient {
                 // If the table is a view, then we will end up calling update 
stats
                 // here for all the view indexes on it. We take care of local 
indexes later.
                 if (index.getIndexType() != IndexType.LOCAL) {
-                    rowCount += 
updateStatisticsInternal(table.getPhysicalName(), index, 
updateStatisticsStmt.getProps(), true);
+                    if (index.getIndexType() != IndexType.LOCAL) {
+                        if (table.getType() != PTableType.VIEW) {
+                            rowCount += 
updateStatisticsInternal(index.getPhysicalName(), index,
+                                    updateStatisticsStmt.getProps(), true);
+                        } else {
+                            rowCount += 
updateStatisticsInternal(table.getPhysicalName(), index,
+                                    updateStatisticsStmt.getProps(), true);
+                        }
+                    }
                 }
             }
             /*

http://git-wip-us.apache.org/repos/asf/phoenix/blob/62f3493c/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
index 560fc0a..c312b9e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.ByteUtil;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -38,8 +39,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  */
 public class StatisticsCollectionRunTracker {
     private static volatile StatisticsCollectionRunTracker INSTANCE;
-    private final Set<HRegionInfo> updateStatsRegions = Collections
-            .newSetFromMap(new ConcurrentHashMap<HRegionInfo, Boolean>());
+    private final Set<ColumnFamilyRegionInfo> updateStatsRegions = Collections
+            .newSetFromMap(new ConcurrentHashMap<ColumnFamilyRegionInfo, 
Boolean>());
     private final Set<HRegionInfo> compactingRegions = Collections
             .newSetFromMap(new ConcurrentHashMap<HRegionInfo, Boolean>());
     private final ExecutorService executor;
@@ -103,16 +104,16 @@ public class StatisticsCollectionRunTracker {
      * @param regionInfo for the region to run UPDATE STATISTICS command on.
      * @return true if UPDATE STATISTICS wasn't already running on the region, 
false otherwise.
      */
-    public boolean addUpdateStatsCommandRegion(HRegionInfo regionInfo) {
-        return updateStatsRegions.add(regionInfo);
+    public boolean addUpdateStatsCommandRegion(HRegionInfo regionInfo, 
Set<byte[]> familySet) {
+        return updateStatsRegions.add(new 
ColumnFamilyRegionInfo(regionInfo,familySet));
     }
 
     /**
      * @param regionInfo for the region to mark as not running UPDATE 
STATISTICS command on.
      * @return true if UPDATE STATISTICS was running on the region, false 
otherwise.
      */
-    public boolean removeUpdateStatsCommandRegion(HRegionInfo regionInfo) {
-        return updateStatsRegions.remove(regionInfo);
+    public boolean removeUpdateStatsCommandRegion(HRegionInfo regionInfo, 
Set<byte[]> familySet) {
+        return updateStatsRegions.remove(new 
ColumnFamilyRegionInfo(regionInfo,familySet));
     }
 
     /**
@@ -124,4 +125,36 @@ public class StatisticsCollectionRunTracker {
         return executor.submit(c);
     }
 
+    class ColumnFamilyRegionInfo {
+        private HRegionInfo regionInfo;
+        private Set<byte[]> familySet;
+
+        public ColumnFamilyRegionInfo(HRegionInfo regionInfo, Set<byte[]> 
familySet) {
+            this.regionInfo = regionInfo;
+            this.familySet = familySet;
+        }
+
+        public HRegionInfo getRegionInfo() {
+            return regionInfo;
+        }
+
+        public Set<byte[]> getFamilySet() {
+            return familySet;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == this) { return true; }
+            if (!(obj instanceof ColumnFamilyRegionInfo)) { return false; }
+
+            ColumnFamilyRegionInfo c = (ColumnFamilyRegionInfo)obj;
+            return c.getRegionInfo().equals(this.regionInfo) && 
ByteUtil.match(this.familySet, c.getFamilySet());
+        }
+
+        @Override
+        public int hashCode() {
+            return this.getRegionInfo().hashCode();
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/62f3493c/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
index d11f3a2..5a2b624 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
@@ -24,8 +24,10 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -579,7 +581,7 @@ public class ByteUtil {
         }
     }
     
-    public static boolean contains(List<byte[]> keys, byte[] key) {
+    public static boolean contains(Collection<byte[]> keys, byte[] key) {
         for (byte[] k : keys) {
             if (Arrays.equals(k, key)) { return true; }
         }
@@ -592,4 +594,16 @@ public class ByteUtil {
         }
         return false;
     }
+
+    public static boolean match(Set<byte[]> keys, Set<byte[]> keys2) {
+        if (keys == keys2) return true;
+        if (keys == null || keys2 == null) return false;
+
+        int size = keys.size();
+        if (keys2.size() != size) return false;
+        for (byte[] k : keys) {
+            if (!contains(keys2, k)) { return false; }
+        }
+        return true;
+    }
 }

Reply via email to