This is an automated email from the ASF dual-hosted git repository. tkhurana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 19119f9229 PHOENIX-7591 Concurrent updates to tables with indexes can cause data inconsistency 19119f9229 is described below commit 19119f92295529860590eeaa24099766e48ba30c Author: tkhurana <khurana.ta...@gmail.com> AuthorDate: Thu May 22 10:55:14 2025 -0700 PHOENIX-7591 Concurrent updates to tables with indexes can cause data inconsistency --- .../hbase/index/util/IndexManagementUtil.java | 2 +- .../phoenix/hbase/index/IndexRegionObserver.java | 13 ++- .../org/apache/phoenix/end2end/IndexToolIT.java | 128 +++++++++++---------- 3 files changed, 81 insertions(+), 62 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java index 3d8c111d8e..3b03e7f51b 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java @@ -199,7 +199,7 @@ public class IndexManagementUtil { try { throw e; } catch (IOException | FatalIndexBuildingFailureException e1) { - LOGGER.info("Rethrowing " + e); + LOGGER.info("Rethrowing ", e); throw e1; } catch (Throwable e1) { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index fee328cbfa..fed0bb274d 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -230,7 +230,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { ignoreWritingDeleteColumnsToIndex = ignore; } public enum BatchMutatePhase { - PRE, POST, FAILED + INIT, PRE, POST, FAILED } // Hack to get around not being able to save any state between @@ -245,7 +245,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { * locks to serialize the access to the BatchMutateContext objects. */ public static class BatchMutateContext { - private volatile BatchMutatePhase currentPhase = BatchMutatePhase.PRE; + private volatile BatchMutatePhase currentPhase = BatchMutatePhase.INIT; // The max of reference counts on the pending rows of this batch at the time this // batch arrives. private int maxPendingRowCount = 0; @@ -1468,6 +1468,8 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { return; } lockRows(context); + // acquired the locks, move to the next phase PRE + context.currentPhase = BatchMutatePhase.PRE; long onDupCheckTime = 0; if (context.hasAtomic || context.returnResult || context.hasGlobalIndex @@ -1637,6 +1639,12 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { return; } try { + // We add to pending rows only after we have locked all the rows in the batch + // If we are in the INIT phase that means we failed to acquire the locks before the + // PRE phase + if (context.getCurrentPhase() != BatchMutatePhase.INIT) { + removePendingRows(context); + } if (success) { context.currentPhase = BatchMutatePhase.POST; if ((context.hasAtomic || context.returnResult) && miniBatchOp.size() == 1) { @@ -1659,7 +1667,6 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { context.currentPhase = BatchMutatePhase.FAILED; } context.countDownAllLatches(); - removePendingRows(context); if (context.indexUpdates != null) { context.indexUpdates.clear(); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index df82cdee28..cef5d90433 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -1053,64 +1053,76 @@ public class IndexToolIT extends BaseTest { String schemaName = SchemaUtil.getSchemaNameFromFullName(fullTableName); String tableName = SchemaUtil.getTableNameFromFullName(fullTableName); String indexName = SchemaUtil.getTableNameFromFullName(fullIndexName); - // This checks the state of every raw index row without rebuilding any row - IndexTool indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName, - indexName, null, 0, IndexTool.IndexVerifyType.ONLY); - LOGGER.info(indexTool.getJob().getCounters().toString()); - TestUtil.dumpTable(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME)); - assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue()); - - // This checks the state of an index row after it is repaired - long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); - // We want to check the index rows again as they may be modified by the read repair - indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName, indexName, - null, 0, IndexTool.IndexVerifyType.ONLY); - LOGGER.info(indexTool.getJob().getCounters().toString()); - - assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); - // The index scrutiny run will trigger index repair on all unverified rows, and they will be repaired or - // deleted (since the age threshold is set to zero ms for these tests - PTable pIndexTable = conn.unwrap(PhoenixConnection.class).getTable(fullIndexName); - if (pIndexTable.getIndexType() != PTable.IndexType.UNCOVERED_GLOBAL) { - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue()); - } - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue()); - // Now we rebuild the entire index table and expect that it is still good after the full rebuild - indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName, indexName, - null, 0, IndexTool.IndexVerifyType.AFTER); - assertEquals(indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue(), - indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); - // Truncate, rebuild and verify the index table - TableName physicalTableName = TableName.valueOf(pIndexTable.getPhysicalName().getBytes()); - PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); - try (Admin admin = pConn.getQueryServices().getAdmin()) { - admin.disableTable(physicalTableName); - admin.truncateTable(physicalTableName, true); + try { + // This checks the state of every raw index row without rebuilding any row + IndexTool indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName, + indexName, null, 0, IndexTool.IndexVerifyType.ONLY); + TestUtil.dumpTable(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME)); + Counters counters = indexTool.getJob().getCounters(); + LOGGER.info(counters.toString()); + assertEquals(0, counters.findCounter(REBUILT_INDEX_ROW_COUNT).getValue()); + assertEquals(0, counters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, counters.findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, counters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, counters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, counters.findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue()); + assertEquals(0, counters.findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue()); + + // This checks the state of an index row after it is repaired + long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); + // We want to check the index rows again as they may be modified by the read repair + indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName, indexName, + null, 0, IndexTool.IndexVerifyType.ONLY); + counters = indexTool.getJob().getCounters(); + LOGGER.info(counters.toString()); + assertEquals(0, counters.findCounter(REBUILT_INDEX_ROW_COUNT).getValue()); + assertEquals(0, counters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, counters.findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, counters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, counters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); + // The index scrutiny run will trigger index repair on all unverified rows, and they will be repaired or + // deleted (since the age threshold is set to zero ms for these tests + PTable pIndexTable = conn.unwrap(PhoenixConnection.class).getTable(fullIndexName); + if (pIndexTable.getIndexType() != PTable.IndexType.UNCOVERED_GLOBAL) { + assertEquals(0, counters.findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue()); + } + assertEquals(0, counters.findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue()); + + // Now we rebuild the entire index table and expect that it is still good after the full rebuild + indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName, indexName, + null, 0, IndexTool.IndexVerifyType.AFTER); + counters = indexTool.getJob().getCounters(); + LOGGER.info(counters.toString()); + assertEquals(counters.findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue(), + counters.findCounter(REBUILT_INDEX_ROW_COUNT).getValue()); + assertEquals(0, counters.findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, counters.findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, counters.findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, counters.findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); + // Truncate, rebuild and verify the index table + TableName physicalTableName = TableName.valueOf(pIndexTable.getPhysicalName().getBytes()); + PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); + try (Admin admin = pConn.getQueryServices().getAdmin()) { + admin.disableTable(physicalTableName); + admin.truncateTable(physicalTableName, true); + } + indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName, indexName, + null, 0, IndexTool.IndexVerifyType.AFTER); + counters = indexTool.getJob().getCounters(); + LOGGER.info(counters.toString()); + assertEquals(0, counters.findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, counters.findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, counters.findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, counters.findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); + pConn.getQueryServices().clearTableRegionCache(TableName.valueOf(fullIndexName)); + long actualRowCountAfterCompaction = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); + assertEquals(actualRowCount, actualRowCountAfterCompaction); + return actualRowCount; + } catch (AssertionError e) { + TestUtil.dumpTable(conn, TableName.valueOf(fullTableName)); + TestUtil.dumpTable(conn, TableName.valueOf(fullIndexName)); + throw e; } - indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName, indexName, - null, 0, IndexTool.IndexVerifyType.AFTER); - assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); - pConn.getQueryServices().clearTableRegionCache(TableName.valueOf(fullIndexName)); - long actualRowCountAfterCompaction = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); - assertEquals(actualRowCount, actualRowCountAfterCompaction); - return actualRowCount; } }