This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push:
new 1c0fdec2c4 PHOENIX-7328 Fix flapping
ConcurrentMutationsExtendedIT#testConcurren… (#1903)
1c0fdec2c4 is described below
commit 1c0fdec2c45d1f4ff782dd4e55694fa9a0c25f43
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Fri Jun 14 11:04:28 2024 +0300
PHOENIX-7328 Fix flapping ConcurrentMutationsExtendedIT#testConcurren…
(#1903)
---
.../phoenix/hbase/index/IndexRegionObserver.java | 200 ++++++++++++---------
.../end2end/ConcurrentMutationsExtendedIT.java | 16 +-
2 files changed, 120 insertions(+), 96 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 0517d2e27a..d34efb2673 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.index.PhoenixIndexBuilderHelper;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import
org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
@@ -150,36 +151,51 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private static final OperationStatus NOWRITE = new
OperationStatus(OperationStatusCode.SUCCESS);
public static final String PHOENIX_APPEND_METADATA_TO_WAL =
"phoenix.append.metadata.to.wal";
public static final boolean DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL = false;
+ /**
+ * Class to represent pending data table rows
+ * */
+ private class PendingRow {
+ private int count;
+ private boolean usable;
+ private ImmutableBytesPtr rowKey;
+ private BatchMutateContext lastContext;
+
+ PendingRow(ImmutableBytesPtr rowKey, BatchMutateContext context) {
+ count = 1;
+ usable = true;
+ lastContext = context;
+ this.rowKey = rowKey;
+ }
- /**
- * Class to represent pending data table rows
- */
- private static class PendingRow {
- private int count;
- private BatchMutateContext lastContext;
-
- PendingRow(BatchMutateContext context) {
- count = 1;
- lastContext = context;
- }
-
- public void add(BatchMutateContext context) {
- count++;
- lastContext = context;
- }
+ public boolean add(BatchMutateContext context) {
+ synchronized (this) {
+ if (usable) {
+ count++;
+ lastContext = context;
+ return true;
+ }
+ }
+ return false;
+ }
- public void remove() {
- count--;
- }
+ public void remove() {
+ synchronized (this) {
+ count--;
+ if (count == 0) {
+ pendingRows.remove(rowKey);
+ usable = false;
+ }
+ }
+ }
- public int getCount() {
+ public int getCount() {
return count;
}
- public BatchMutateContext getLastContext() {
+ public BatchMutateContext getLastContext() {
return lastContext;
}
- }
+ }
private static boolean ignoreIndexRebuildForTesting = false;
private static boolean failPreIndexUpdatesForTesting = false;
@@ -276,6 +292,9 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
public CountDownLatch getCountDownLatch() {
synchronized (this) {
+ if (currentPhase != BatchMutatePhase.PRE) {
+ return null;
+ }
if (waitList == null) {
waitList = new ArrayList<>();
}
@@ -574,19 +593,6 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
context.rowLocks.clear();
}
- private void populatePendingRows(BatchMutateContext context) {
- for (RowLock rowLock : context.rowLocks) {
- ImmutableBytesPtr rowKey = rowLock.getRowKey();
- PendingRow pendingRow = pendingRows.get(rowKey);
- if (pendingRow == null) {
- pendingRows.put(rowKey, new PendingRow(context));
- } else {
- // m is a mutation on a row that has already a pending mutation
in progress from another batch
- pendingRow.add(context);
- }
- }
- }
-
private Collection<? extends Mutation>
groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
BatchMutateContext
context) throws IOException {
context.multiMutationMap = new HashMap<>();
@@ -837,28 +843,49 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
return false;
}
/**
- * Retrieve the last committed data row state.
+ * Retrieve the data row state either from memory or disk. The rows are
locked by the caller.
*/
private void
getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
BatchMutateContext context) throws
IOException {
Set<KeyRange> keys = new HashSet<KeyRange>(context.rowsToLock.size());
for (ImmutableBytesPtr rowKeyPtr : context.rowsToLock) {
- PendingRow pendingRow = pendingRows.get(rowKeyPtr);
- if (pendingRow != null &&
pendingRow.getLastContext().getCurrentPhase() == BatchMutatePhase.PRE) {
- if (context.lastConcurrentBatchContext == null) {
- context.lastConcurrentBatchContext = new HashMap<>();
- }
- context.lastConcurrentBatchContext.put(rowKeyPtr,
pendingRow.getLastContext());
- if (context.maxPendingRowCount < pendingRow.getCount()) {
- context.maxPendingRowCount = pendingRow.getCount();
- }
- Put put =
pendingRow.getLastContext().getNextDataRowState(rowKeyPtr);
- if (put != null) {
- context.dataRowStates.put(rowKeyPtr, new Pair<Put,
Put>(put, new Put(put)));
- }
- }
- else {
+ PendingRow pendingRow = new PendingRow(rowKeyPtr, context);
+ // Add the data table rows in the mini batch to the per region
collection of pending
+ // rows. This will be used to detect concurrent updates
+ PendingRow existingPendingRow = pendingRows.putIfAbsent(rowKeyPtr,
pendingRow);
+ if (existingPendingRow == null) {
+ // There was no pending row for this row key. We need to
retrieve this row from disk
keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(),
SortOrder.ASC));
+ } else {
+ // There is a pending row for this row key. We need to
retrieve the row from memory
+ BatchMutateContext lastContext =
existingPendingRow.getLastContext();
+ if (existingPendingRow.add(context)) {
+ BatchMutatePhase phase = lastContext.getCurrentPhase();
+ Preconditions.checkArgument(phase != BatchMutatePhase.POST,
+ "the phase of the last batch cannot be POST");
+ if (phase == BatchMutatePhase.PRE) {
+ if (context.lastConcurrentBatchContext == null) {
+ context.lastConcurrentBatchContext = new
HashMap<>();
+ }
+ context.lastConcurrentBatchContext.put(rowKeyPtr,
lastContext);
+ if (context.maxPendingRowCount <
existingPendingRow.getCount()) {
+ context.maxPendingRowCount =
existingPendingRow.getCount();
+ }
+ Put put = lastContext.getNextDataRowState(rowKeyPtr);
+ if (put != null) {
+ context.dataRowStates.put(rowKeyPtr, new
Pair<>(put, new Put(put)));
+ }
+ } else {
+ // The last batch for this row key failed. We cannot
use the memory state.
+ // So we need to retrieve this row from disk
+
keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(), SortOrder.ASC));
+ }
+ } else {
+ // The existing pending row is removed from the map. That
means there is no
+ // pending row for this row key anymore. We need to add
the new one to the map
+ pendingRows.put(rowKeyPtr, pendingRow);
+ keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(),
SortOrder.ASC));
+ }
}
}
if (keys.isEmpty()) {
@@ -1103,41 +1130,51 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
}
+ /**
+ * Wait for the previous batches to complete. If any of the previous batch
fails then this
+ * batch will fail too and needs to be retried. The rows are locked by the
caller.
+ * @param table
+ * @param context
+ * @throws Throwable
+ */
private void waitForPreviousConcurrentBatch(TableName table,
BatchMutateContext context)
throws Throwable {
- boolean done = true;
for (BatchMutateContext lastContext :
context.lastConcurrentBatchContext.values()) {
BatchMutatePhase phase = lastContext.getCurrentPhase();
-
- if (phase == BatchMutatePhase.PRE) {
+ if (phase == BatchMutatePhase.FAILED) {
+ context.currentPhase = BatchMutatePhase.FAILED;
+ break;
+ } else if (phase == BatchMutatePhase.PRE) {
CountDownLatch countDownLatch =
lastContext.getCountDownLatch();
+ if (countDownLatch == null) {
+ // phase changed from PRE to either FAILED or POST
+ if (phase == BatchMutatePhase.FAILED) {
+ context.currentPhase = BatchMutatePhase.FAILED;
+ break;
+ }
+ continue;
+ }
// Release the locks so that the previous concurrent mutation
can go into the post phase
unlockRows(context);
// Wait for at most one concurrentMutationWaitDuration for
each level in the dependency tree of batches.
// lastContext.getMaxPendingRowCount() is the depth of the
subtree rooted at the batch pointed by lastContext
if (!countDownLatch.await((lastContext.getMaxPendingRowCount()
+ 1) * concurrentMutationWaitDuration,
TimeUnit.MILLISECONDS)) {
+ context.currentPhase = BatchMutatePhase.FAILED;
LOG.debug(String.format("latch timeout context %s last
%s", context, lastContext));
- done = false;
+ break;
}
- // Acquire the locks again before letting the region proceed
with data table updates
- lockRows(context);
- if (!done) {
- // previous concurrent batch did not complete so we have
to retry this batch
+ if (lastContext.getCurrentPhase() == BatchMutatePhase.FAILED) {
+ context.currentPhase = BatchMutatePhase.FAILED;
break;
- } else {
- // read the phase again to determine the status of
previous batch
- phase = lastContext.getCurrentPhase();
- LOG.debug(String.format("context %s last %s exit phase
%s", context, lastContext, phase));
}
- }
-
- if (phase == BatchMutatePhase.FAILED) {
- done = false;
- break;
+ // Acquire the locks again before letting the region proceed
with data table updates
+ lockRows(context);
+ LOG.debug(String.format("context %s last %s exit phase %s",
context, lastContext,
+ lastContext.getCurrentPhase()));
}
}
- if (!done) {
+ if (context.currentPhase == BatchMutatePhase.FAILED) {
// This batch needs to be retried since one of the previous
concurrent batches has not completed yet.
// Throwing an IOException will result in retries of this batch.
Removal of reference counts and
// locks for the rows of this batch will be done in
postBatchMutateIndispensably()
@@ -1209,9 +1246,6 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
if (context.hasGlobalIndex || context.hasUncoveredIndex ||
context.hasTransform) {
// Prepare next data rows states for pending mutations (for global
indexes)
prepareDataRowStates(c, miniBatchOp, context, now);
- // Add the table rows in the mini batch to the collection of
pending rows. This will be used to detect
- // concurrent updates
- populatePendingRows(context);
// early exit if it turns out we don't have any edits
long start = EnvironmentEdgeManager.currentTimeMillis();
preparePreIndexMutations(context, now, indexMetaData);
@@ -1266,6 +1300,10 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
RowLock rowLock = rowLockIterator.next();
ImmutableBytesPtr rowKey = rowLock.getRowKey();
if (row.equals(rowKey)) {
+ PendingRow pendingRow = pendingRows.get(rowKey);
+ if (pendingRow != null) {
+ pendingRow.remove();
+ }
rowLock.release();
rowLockIterator.remove();
context.rowsToLock.remove(row);
@@ -1363,7 +1401,6 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
doIndexWritesWithExceptions(context, true);
metricSource.updatePostIndexUpdateTime(dataTableName,
EnvironmentEdgeManager.currentTimeMillis() - start);
- return;
} catch (Throwable e) {
metricSource.updatePostIndexUpdateFailureTime(dataTableName,
EnvironmentEdgeManager.currentTimeMillis() - start);
@@ -1397,24 +1434,10 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
private void removePendingRows(BatchMutateContext context) {
- if (context.currentPhase == BatchMutatePhase.FAILED) {
- // This batch failed. All concurrent batches will fail too. So we
can remove
- // all rows of this batch from the memory as the in-memory row
images are not valid
- // anymore. Please note that when a batch fails, some of the rows
may not have been
- // locked and so it is not safe to update the pending row entries in
that case.
- for (ImmutableBytesPtr rowKey : context.rowsToLock) {
- pendingRows.remove(rowKey);
- }
- return;
- }
- for (RowLock rowLock : context.rowLocks) {
- ImmutableBytesPtr rowKey = rowLock.getRowKey();
+ for (ImmutableBytesPtr rowKey : context.rowsToLock) {
PendingRow pendingRow = pendingRows.get(rowKey);
if (pendingRow != null) {
pendingRow.remove();
- if (pendingRow.getCount() == 0) {
- pendingRows.remove(rowKey);
- }
}
}
}
@@ -1429,7 +1452,6 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
doIndexWritesWithExceptions(context, false);
metricSource.updatePreIndexUpdateTime(dataTableName,
EnvironmentEdgeManager.currentTimeMillis() - start);
- return;
} catch (Throwable e) {
metricSource.updatePreIndexUpdateFailureTime(dataTableName,
EnvironmentEdgeManager.currentTimeMillis() - start);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 95639d284a..278ccb05b5 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -87,14 +87,17 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
}
@BeforeClass
public static synchronized void doSetup() throws Exception {
- Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(4);
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
Long.toString(0));
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
Integer.toString(MAX_LOOKBACK_AGE));
- // The following sets the row lock wait duration to 10 ms to test the
code path handling
+ // The following sets the row lock wait duration to 100 ms to test the
code path handling
// row lock timeouts. When there are concurrent mutations, the wait
time can be
- // much longer than 10 ms.
- props.put("hbase.rowlock.wait.duration", "10");
+ // much longer than 100 ms
+ props.put("hbase.rowlock.wait.duration", "100");
+ // The following sets the wait duration for the previous concurrent
batch to 10 ms to test
+ // the code path handling timeouts
+ props.put("phoenix.index.concurrent.wait.duration.ms", "10");
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@Parameterized.Parameters(
@@ -308,8 +311,8 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
@Test
public void testConcurrentUpserts() throws Exception {
int nThreads = 10;
- final int batchSize = 20;
- final int nRows = 100;
+ final int batchSize = 100;
+ final int nRows = 499;
final int nIndexValues = 23;
final String tableName = generateUniqueName();
final String indexName = generateUniqueName();
@@ -341,7 +344,6 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
conn.commit();
} catch (SQLException e) {
System.out.println(e);
- throw new RuntimeException(e);
} finally {
doneSignal.countDown();
}