This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 19119f9229 PHOENIX-7591 Concurrent updates to tables with indexes can 
cause data inconsistency
19119f9229 is described below

commit 19119f92295529860590eeaa24099766e48ba30c
Author: tkhurana <khurana.ta...@gmail.com>
AuthorDate: Thu May 22 10:55:14 2025 -0700

    PHOENIX-7591 Concurrent updates to tables with indexes can cause data 
inconsistency
---
 .../hbase/index/util/IndexManagementUtil.java      |   2 +-
 .../phoenix/hbase/index/IndexRegionObserver.java   |  13 ++-
 .../org/apache/phoenix/end2end/IndexToolIT.java    | 128 +++++++++++----------
 3 files changed, 81 insertions(+), 62 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
index 3d8c111d8e..3b03e7f51b 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -199,7 +199,7 @@ public class IndexManagementUtil {
         try {
             throw e;
         } catch (IOException | FatalIndexBuildingFailureException e1) {
-            LOGGER.info("Rethrowing " + e);
+            LOGGER.info("Rethrowing ", e);
             throw e1;
         }
         catch (Throwable e1) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index fee328cbfa..fed0bb274d 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -230,7 +230,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         ignoreWritingDeleteColumnsToIndex = ignore;
     }
     public enum BatchMutatePhase {
-        PRE, POST, FAILED
+        INIT, PRE, POST, FAILED
     }
 
     // Hack to get around not being able to save any state between
@@ -245,7 +245,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     * locks to serialize the access to the BatchMutateContext objects.
     */
     public static class BatchMutateContext {
-        private volatile BatchMutatePhase currentPhase = BatchMutatePhase.PRE;
+        private volatile BatchMutatePhase currentPhase = BatchMutatePhase.INIT;
         // The max of reference counts on the pending rows of this batch at 
the time this
         // batch arrives.
         private int maxPendingRowCount = 0;
@@ -1468,6 +1468,8 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
             return;
         }
         lockRows(context);
+        // acquired the locks, move to the next phase PRE
+        context.currentPhase = BatchMutatePhase.PRE;
         long onDupCheckTime = 0;
 
         if (context.hasAtomic || context.returnResult || context.hasGlobalIndex
@@ -1637,6 +1639,12 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
           return;
       }
       try {
+          // We add to pending rows only after we have locked all the rows in 
the batch
+          // If we are in the INIT phase that means we failed to acquire the 
locks before the
+          // PRE phase
+          if (context.getCurrentPhase() != BatchMutatePhase.INIT) {
+              removePendingRows(context);
+          }
           if (success) {
               context.currentPhase = BatchMutatePhase.POST;
               if ((context.hasAtomic || context.returnResult) && 
miniBatchOp.size() == 1) {
@@ -1659,7 +1667,6 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
               context.currentPhase = BatchMutatePhase.FAILED;
           }
           context.countDownAllLatches();
-          removePendingRows(context);
           if (context.indexUpdates != null) {
               context.indexUpdates.clear();
           }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index df82cdee28..cef5d90433 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -1053,64 +1053,76 @@ public class IndexToolIT extends BaseTest {
         String schemaName = 
SchemaUtil.getSchemaNameFromFullName(fullTableName);
         String tableName = SchemaUtil.getTableNameFromFullName(fullTableName);
         String indexName = SchemaUtil.getTableNameFromFullName(fullIndexName);
-        // This checks the state of every raw index row without rebuilding any 
row
-        IndexTool indexTool = IndexToolIT.runIndexTool(false, schemaName, 
tableName,
-                indexName, null, 0, IndexTool.IndexVerifyType.ONLY);
-        LOGGER.info(indexTool.getJob().getCounters().toString());
-        TestUtil.dumpTable(conn, 
TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
-
-        // This checks the state of an index row after it is repaired
-        long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, 
fullTableName, fullIndexName);
-        // We want to check the index rows again as they may be modified by 
the read repair
-        indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName, 
indexName,
-                null, 0, IndexTool.IndexVerifyType.ONLY);
-        LOGGER.info(indexTool.getJob().getCounters().toString());
-
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
-        // The index scrutiny run will trigger index repair on all unverified 
rows, and they will be repaired or
-        // deleted (since the age threshold is set to zero ms for these tests
-        PTable pIndexTable = 
conn.unwrap(PhoenixConnection.class).getTable(fullIndexName);
-        if (pIndexTable.getIndexType() != PTable.IndexType.UNCOVERED_GLOBAL) {
-            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue());
-        }
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
-        // Now we rebuild the entire index table and expect that it is still 
good after the full rebuild
-        indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName, 
indexName,
-                null, 0, IndexTool.IndexVerifyType.AFTER);
-        
assertEquals(indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue(),
-                
indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
-        // Truncate, rebuild and verify the index table
-        TableName physicalTableName = 
TableName.valueOf(pIndexTable.getPhysicalName().getBytes());
-        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
-        try (Admin admin = pConn.getQueryServices().getAdmin()) {
-            admin.disableTable(physicalTableName);
-            admin.truncateTable(physicalTableName, true);
+        try {
+            // This checks the state of every raw index row without rebuilding 
any row
+            IndexTool indexTool = IndexToolIT.runIndexTool(false, schemaName, 
tableName,
+                    indexName, null, 0, IndexTool.IndexVerifyType.ONLY);
+            TestUtil.dumpTable(conn, 
TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
+            Counters counters = indexTool.getJob().getCounters();
+            LOGGER.info(counters.toString());
+            assertEquals(0, 
counters.findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
counters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
counters.findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
counters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
counters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
counters.findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
counters.findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
+
+            // This checks the state of an index row after it is repaired
+            long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, 
fullTableName, fullIndexName);
+            // We want to check the index rows again as they may be modified 
by the read repair
+            indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName, 
indexName,
+                    null, 0, IndexTool.IndexVerifyType.ONLY);
+            counters = indexTool.getJob().getCounters();
+            LOGGER.info(counters.toString());
+            assertEquals(0, 
counters.findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
counters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
counters.findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
counters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
counters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+            // The index scrutiny run will trigger index repair on all 
unverified rows, and they will be repaired or
+            // deleted (since the age threshold is set to zero ms for these 
tests
+            PTable pIndexTable = 
conn.unwrap(PhoenixConnection.class).getTable(fullIndexName);
+            if (pIndexTable.getIndexType() != 
PTable.IndexType.UNCOVERED_GLOBAL) {
+                assertEquals(0, 
counters.findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue());
+            }
+            assertEquals(0, 
counters.findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
+
+            // Now we rebuild the entire index table and expect that it is 
still good after the full rebuild
+            indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName, 
indexName,
+                    null, 0, IndexTool.IndexVerifyType.AFTER);
+            counters = indexTool.getJob().getCounters();
+            LOGGER.info(counters.toString());
+            
assertEquals(counters.findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue(),
+                    counters.findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
counters.findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
counters.findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
counters.findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
counters.findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+            // Truncate, rebuild and verify the index table
+            TableName physicalTableName = 
TableName.valueOf(pIndexTable.getPhysicalName().getBytes());
+            PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+            try (Admin admin = pConn.getQueryServices().getAdmin()) {
+                admin.disableTable(physicalTableName);
+                admin.truncateTable(physicalTableName, true);
+            }
+            indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName, 
indexName,
+                    null, 0, IndexTool.IndexVerifyType.AFTER);
+            counters = indexTool.getJob().getCounters();
+            LOGGER.info(counters.toString());
+            assertEquals(0, 
counters.findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
counters.findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
counters.findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
counters.findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+            
pConn.getQueryServices().clearTableRegionCache(TableName.valueOf(fullIndexName));
+            long actualRowCountAfterCompaction = 
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
+            assertEquals(actualRowCount, actualRowCountAfterCompaction);
+            return actualRowCount;
+        } catch (AssertionError e) {
+            TestUtil.dumpTable(conn, TableName.valueOf(fullTableName));
+            TestUtil.dumpTable(conn, TableName.valueOf(fullIndexName));
+            throw e;
         }
-        indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName, 
indexName,
-                null, 0, IndexTool.IndexVerifyType.AFTER);
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
-        assertEquals(0, 
indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
-        
pConn.getQueryServices().clearTableRegionCache(TableName.valueOf(fullIndexName));
-        long actualRowCountAfterCompaction = 
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
-        assertEquals(actualRowCount, actualRowCountAfterCompaction);
-        return actualRowCount;
     }
 }

Reply via email to