This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 501ae7ee3c PHOENIX-7379 Improve handling of concurrent index mutations
with the … (#1951)
501ae7ee3c is described below
commit 501ae7ee3c66e4c6dc08374938e6595e355197dc
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Wed Aug 14 23:23:37 2024 -0700
PHOENIX-7379 Improve handling of concurrent index mutations with the …
(#1951)
---
.../phoenix/hbase/index/IndexRegionObserver.java | 87 ++++++++++++++++------
.../end2end/ConcurrentMutationsExtendedIT.java | 19 +++--
2 files changed, 77 insertions(+), 29 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 317841fb13..b2e3b5ff9d 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -366,7 +365,8 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private boolean shouldWALAppend = DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL;
private boolean isNamespaceEnabled = false;
private boolean useBloomFilter = false;
-
+ private long lastTimestamp = 0;
+ private List<Set<ImmutableBytesPtr>> batchesWithLastTimestamp = new
ArrayList<>();
private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS =
100;
@@ -1044,7 +1044,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
* either set to "verified" or the row is deleted.
*/
private void preparePreIndexMutations(BatchMutateContext context,
- long now,
+ long batchTimestamp,
PhoenixIndexMetaData indexMetaData)
throws Throwable {
List<IndexMaintainer> maintainers =
indexMetaData.getIndexMaintainers();
// get the current span, or just use a null-span to avoid a bunch of
if statements
@@ -1056,7 +1056,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
current.addTimelineAnnotation("Built index updates, doing
preStep");
// The rest of this method is for handling global index updates
context.indexUpdates =
ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
- prepareIndexMutations(context, maintainers, now);
+ prepareIndexMutations(context, maintainers, batchTimestamp);
context.preIndexUpdates =
ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
int updateCount = 0;
@@ -1076,7 +1076,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
// Set the status of the index row to "unverified"
Put unverifiedPut = new Put(m.getRow());
unverifiedPut.addColumn(
- emptyCF, emptyCQ, now,
QueryConstants.UNVERIFIED_BYTES);
+ emptyCF, emptyCQ, batchTimestamp,
QueryConstants.UNVERIFIED_BYTES);
// This will be done before the data table row is
updated (i.e., in the first write phase)
context.preIndexUpdates.put(hTableInterfaceReference,
unverifiedPut);
}
@@ -1100,7 +1100,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
private void preparePostIndexMutations(BatchMutateContext context,
- long now,
+ long batchTimestamp,
PhoenixIndexMetaData indexMetaData)
{
context.postIndexUpdates =
ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
List<IndexMaintainer> maintainers =
indexMetaData.getIndexMaintainers();
@@ -1116,7 +1116,8 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
if (!indexMaintainer.isUncovered()) {
Put verifiedPut = new Put(m.getRow());
// Set the status of the index row to "verified"
- verifiedPut.addColumn(emptyCF, emptyCQ, now,
QueryConstants.VERIFIED_BYTES);
+ verifiedPut.addColumn(emptyCF, emptyCQ, batchTimestamp,
+ QueryConstants.VERIFIED_BYTES);
context.postIndexUpdates.put(hTableInterfaceReference,
verifiedPut);
}
} else {
@@ -1211,6 +1212,53 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
}
+ private boolean shouldSleep(BatchMutateContext context) {
+ for (ImmutableBytesPtr ptr : context.rowsToLock) {
+ for (Set set : batchesWithLastTimestamp) {
+ if (set.contains(ptr)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ private long getBatchTimestamp(BatchMutateContext context, TableName table)
+ throws InterruptedException {
+ synchronized (this) {
+ long ts = EnvironmentEdgeManager.currentTimeMillis();
+ if (ts != lastTimestamp) {
+ // The timestamp for this batch will be different from the
last batch processed.
+ lastTimestamp = ts;
+ batchesWithLastTimestamp.clear();
+ batchesWithLastTimestamp.add(context.rowsToLock);
+ return ts;
+ } else {
+ if (!shouldSleep(context)) {
+ // There is no need to sleep as the last batches with the
same timestamp
+ // do not have a common row this batch
+ batchesWithLastTimestamp.add(context.rowsToLock);
+ return ts;
+ }
+ }
+ }
+ // Sleep for one millisecond. The sleep is necessary to get different
timestamps
+ // for concurrent batches that share common rows.
+ Thread.sleep(1);
+ LOG.debug("slept 1ms for " + table.getNameAsString());
+ synchronized (this) {
+ long ts = EnvironmentEdgeManager.currentTimeMillis();
+ if (ts != lastTimestamp) {
+ // The timestamp for this batch will be different from the
last batch processed.
+ lastTimestamp = ts;
+ batchesWithLastTimestamp.clear();
+ }
+ // We do not have to check again if we need to sleep again since
we got the next
+ // timestamp while holding the row locks. This mean there cannot
be a new
+ // mutation with the same row attempting get the same timestamp
+ batchesWithLastTimestamp.add(context.rowsToLock);
+ return ts;
+ }
+ }
public void
preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c,
miniBatchOp);
@@ -1226,7 +1274,8 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
ServerIndexUtil.setDeleteAttributes(miniBatchOp);
}
- // Exclusively lock all rows to do consistent writes over multiple
tables (i.e., the data and its index tables)
+ // Exclusively lock all rows to do consistent writes over multiple
tables
+ // (i.e., the data and its index tables)
populateRowsToLock(miniBatchOp, context);
// early exit if it turns out we don't have any update for indexes
if (context.rowsToLock.isEmpty()) {
@@ -1265,27 +1314,19 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
}
- long now = EnvironmentEdgeManager.currentTimeMillis();
- // Update the timestamps of the data table mutations to prevent
overlapping timestamps (which prevents index
- // inconsistencies as this case isn't handled correctly currently).
- setTimestamps(miniBatchOp, builder, now);
-
TableName table =
c.getEnvironment().getRegion().getRegionInfo().getTable();
+ long batchTimestamp = getBatchTimestamp(context, table);
+ // Update the timestamps of the data table mutations to prevent
overlapping timestamps
+ // (which prevents index inconsistencies as this case is not handled).
+ setTimestamps(miniBatchOp, builder, batchTimestamp);
if (context.hasGlobalIndex || context.hasUncoveredIndex ||
context.hasTransform) {
// Prepare next data rows states for pending mutations (for global
indexes)
- prepareDataRowStates(c, miniBatchOp, context, now);
+ prepareDataRowStates(c, miniBatchOp, context, batchTimestamp);
// early exit if it turns out we don't have any edits
long start = EnvironmentEdgeManager.currentTimeMillis();
- preparePreIndexMutations(context, now, indexMetaData);
+ preparePreIndexMutations(context, batchTimestamp, indexMetaData);
metricSource.updateIndexPrepareTime(dataTableName,
EnvironmentEdgeManager.currentTimeMillis() - start);
- // Sleep for one millisecond if we have prepared the index updates
in less than 1 ms. The sleep is necessary to
- // get different timestamps for concurrent batches that share
common rows. It is very rare that the index updates
- // can be prepared in less than one millisecond
- if (!context.rowLocks.isEmpty() && now ==
EnvironmentEdgeManager.currentTimeMillis()) {
- Thread.sleep(1);
- LOG.debug("slept 1ms for " + table.getNameAsString());
- }
// Release the locks before making RPC calls for index updates
unlockRows(context);
// Do the first phase index updates
@@ -1295,7 +1336,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
if (context.lastConcurrentBatchContext != null) {
waitForPreviousConcurrentBatch(table, context);
}
- preparePostIndexMutations(context, now, indexMetaData);
+ preparePostIndexMutations(context, batchTimestamp, indexMetaData);
}
if (context.hasLocalIndex) {
// Group all the updates for a single row into a single update to
be processed (for local indexes)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 278ccb05b5..12dd00fd15 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.end2end;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
-import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
@@ -42,6 +41,8 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
@@ -68,12 +69,15 @@ import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEF
import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT;
import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT;
import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
-
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ConcurrentMutationsExtendedIT.class);
private final boolean uncovered;
private static final Random RAND = new Random(5);
private static final String MVCC_LOCK_TEST_TABLE_PREFIX = "MVCCLOCKTEST_";
@@ -111,7 +115,7 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
// This checks the state of every raw index row without rebuilding any
row
IndexTool indexTool = IndexToolIT.runIndexTool(false, "", tableName,
indexName, null, 0, IndexTool.IndexVerifyType.ONLY);
- System.out.println(indexTool.getJob().getCounters());
+ LOGGER.info(indexTool.getJob().getCounters().toString());
TestUtil.dumpTable(conn,
TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
assertEquals(0,
indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
@@ -126,7 +130,7 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
// We want to check the index rows again as they may be modified by
the read repair
indexTool = IndexToolIT.runIndexTool(false, "", tableName, indexName,
null, 0, IndexTool.IndexVerifyType.ONLY);
- System.out.println(indexTool.getJob().getCounters());
+ LOGGER.info(indexTool.getJob().getCounters().toString());
assertEquals(0,
indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
@@ -324,6 +328,7 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
+ tableName + "(v1)" + (uncovered ? "" : "INCLUDE(v2, v3)"));
final CountDownLatch doneSignal = new CountDownLatch(nThreads);
Runnable[] runnables = new Runnable[nThreads];
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
for (int i = 0; i < nThreads; i++) {
runnables[i] = new Runnable() {
@@ -343,7 +348,7 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
}
conn.commit();
} catch (SQLException e) {
- System.out.println(e);
+ LOGGER.warn("Exception during upsert : " + e);
} finally {
doneSignal.countDown();
}
@@ -357,6 +362,8 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
}
assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS));
+ LOGGER.info("Total upsert time in ms : "
+ + (EnvironmentEdgeManager.currentTimeMillis() - startTime));
long actualRowCount = verifyIndexTable(tableName, indexName, conn);
assertEquals(nRows, actualRowCount);
}