This is an automated email from the ASF dual-hosted git repository.

larsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new dbc98ac  Local indexes get out of sync after changes for global 
consistent indexes.
dbc98ac is described below

commit dbc98acd1f09d4d8c360a84f9d126b4e03a73fe0
Author: Lars <la...@apache.org>
AuthorDate: Sat Aug 22 10:50:53 2020 -0700

    Local indexes get out of sync after changes for global consistent indexes.
---
 .../apache/phoenix/end2end/index/LocalIndexIT.java | 33 ++++++++++
 .../phoenix/hbase/index/IndexRegionObserver.java   | 70 ++++++++++++----------
 2 files changed, 71 insertions(+), 32 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 724da6e..0965ce1 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -103,6 +103,39 @@ public class LocalIndexIT extends BaseLocalIndexIT {
     }
 
     @Test
+    public void testLocalIndexConsistency() throws Exception {
+        if (isNamespaceMapped) {
+            return;
+        }
+        String tableName = schemaName + "." + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+
+        Connection conn = getConnection();
+        conn.setAutoCommit(true);
+
+        conn.createStatement().execute("CREATE TABLE " + tableName + " (pk 
INTEGER PRIMARY KEY, v1 FLOAT) SPLIT ON (2000)");
+        conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " 
ON " + tableName + "(v1)");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " 
VALUES(rand() * 4000, rand())");
+
+        ResultSet rs;
+        for (int i=0; i<15; i++) {
+            conn.createStatement().execute("UPSERT INTO " + tableName + " 
SELECT rand() * 4000, rand() FROM " + tableName);
+
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + 
tableName);
+            rs.next();
+            int indexCount = rs.getInt(1);
+            rs.close();
+
+            rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ 
COUNT(*) FROM " + tableName);
+            rs.next();
+            int tableCount = rs.getInt(1);
+            rs.close();
+
+            assertEquals(indexCount, tableCount);
+        }
+    }
+
+    @Test
     public void testUseUncoveredLocalIndexWithPrefix() throws Exception {
         String tableName = schemaName + "." + generateUniqueName();
         String indexName = "IDX_" + generateUniqueName();
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index bfeadcb..2d0cf51 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -702,7 +702,7 @@ public class IndexRegionObserver implements RegionObserver, 
RegionCoprocessor {
      * unverified status. In phase 2, data table mutations are applied. In 
phase 3, the status for an index table row is
      * either set to "verified" or the row is deleted.
      */
-    private void 
preparePreIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
+    private boolean 
preparePreIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
                                           
MiniBatchOperationInProgress<Mutation> miniBatchOp,
                                           BatchMutateContext context,
                                           Collection<? extends Mutation> 
pendingMutations,
@@ -716,13 +716,6 @@ public class IndexRegionObserver implements 
RegionObserver, RegionCoprocessor {
                 current = NullSpan.INSTANCE;
             }
             current.addTimelineAnnotation("Built index updates, doing 
preStep");
-            // Handle local index updates
-            for (IndexMaintainer indexMaintainer : maintainers) {
-                if (indexMaintainer.isLocalIndex()) {
-                    handleLocalIndexUpdates(c, miniBatchOp, pendingMutations, 
indexMetaData);
-                    break;
-                }
-            }
             // The rest of this method is for handling global index updates
             context.indexUpdates = 
ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
             prepareIndexMutations(context, maintainers, now);
@@ -730,6 +723,9 @@ public class IndexRegionObserver implements RegionObserver, 
RegionCoprocessor {
             context.preIndexUpdates = 
ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
             int updateCount = 0;
             for (IndexMaintainer indexMaintainer : maintainers) {
+                if (indexMaintainer.isLocalIndex()) {
+                    continue;
+                }
                 updateCount++;
                 byte[] emptyCF = 
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
                 byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
@@ -751,6 +747,7 @@ public class IndexRegionObserver implements RegionObserver, 
RegionCoprocessor {
                 }
             }
             TracingUtils.addAnnotation(current, "index update count", 
updateCount);
+            return updateCount != 0;
         }
     }
 
@@ -796,11 +793,22 @@ public class IndexRegionObserver implements 
RegionObserver, RegionCoprocessor {
         return true;
     }
 
-    private void preparePostIndexMutations(BatchMutateContext context, long 
now, PhoenixIndexMetaData indexMetaData,
-                                           String tableName)
+    private void 
preparePostIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
+                                           
MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                           BatchMutateContext context,
+                                           Collection<? extends Mutation> 
pendingMutations,
+                                           long now,
+                                           PhoenixIndexMetaData indexMetaData)
             throws Throwable {
         context.postIndexUpdates = 
ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
         List<IndexMaintainer> maintainers = 
indexMetaData.getIndexMaintainers();
+        // Handle local index updates
+        for (IndexMaintainer indexMaintainer : maintainers) {
+            if (indexMaintainer.isLocalIndex()) {
+                handleLocalIndexUpdates(c, miniBatchOp, pendingMutations, 
indexMetaData);
+                break;
+            }
+        }
         // Check if we need to skip post index update for any of the rows
         for (IndexMaintainer indexMaintainer : maintainers) {
             byte[] emptyCF = 
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
@@ -837,6 +845,7 @@ public class IndexRegionObserver implements RegionObserver, 
RegionCoprocessor {
                             rowLock.release();
                         }
                         context.rowLocks.clear();
+                        String tableName = 
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
                         throw new IOException("One of the concurrent mutations 
does not have all indexed columns. " +
                                 "The batch needs to be retried " + tableName);
                     }
@@ -927,30 +936,27 @@ public class IndexRegionObserver implements 
RegionObserver, RegionCoprocessor {
             return;
         }
         long start = EnvironmentEdgeManager.currentTimeMillis();
-        preparePreIndexMutations(c, miniBatchOp, context, mutations, now, 
indexMetaData);
+        boolean hasGlobalIndex = preparePreIndexMutations(c, miniBatchOp, 
context, mutations, now, indexMetaData);
         
metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() 
- start);
-        // Sleep for one millisecond if we have prepared the index updates in 
less than 1 ms. The sleep is necessary to
-        // get different timestamps for concurrent batches that share common 
rows. It is very rare that the index updates
-        // can be prepared in less than one millisecond
-        if (!context.rowLocks.isEmpty() && now == 
EnvironmentEdgeManager.currentTimeMillis()) {
-            Thread.sleep(1);
-            LOG.debug("slept 1ms for " + 
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
-        }
-        // Release the locks before making RPC calls for index updates
-        for (RowLock rowLock : context.rowLocks) {
-            rowLock.release();
-        }
-        // Do the first phase index updates
-        doPre(c, context, miniBatchOp);
-        // Acquire the locks again before letting the region proceed with data 
table updates
-        List<RowLock> rowLocks = 
Lists.newArrayListWithExpectedSize(context.rowLocks.size());
-        for (RowLock rowLock : context.rowLocks) {
-            rowLocks.add(lockManager.lockRow(rowLock.getRowKey(), 
rowLockWaitDuration));
+        if (hasGlobalIndex) {
+            // Sleep for one millisecond if we have prepared the index updates 
in less than 1 ms. The sleep is necessary to
+            // get different timestamps for concurrent batches that share 
common rows. It is very rare that the index updates
+            // can be prepared in less than one millisecond
+            if (!context.rowLocks.isEmpty() && now == 
EnvironmentEdgeManager.currentTimeMillis()) {
+                Thread.sleep(1);
+                LOG.debug("slept 1ms for " + 
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+            }
+            // Release the locks before making RPC calls for index updates
+            for (RowLock rowLock : context.rowLocks) {
+                rowLock.release();
+            }
+            // Do the first phase index updates
+            doPre(c, context, miniBatchOp);
+            // Acquire the locks again before letting the region proceed with 
data table updates
+            context.rowLocks.clear();
+            lockRows(context);
         }
-        context.rowLocks.clear();
-        context.rowLocks = rowLocks;
-        preparePostIndexMutations(context, now, indexMetaData,
-                
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+        preparePostIndexMutations(c, miniBatchOp, context, mutations, now, 
indexMetaData);
         if (failDataTableUpdatesForTesting) {
             throw new DoNotRetryIOException("Simulating the data table write 
failure");
         }

Reply via email to