This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push:
new f75f03e7cf PHOENIX-7245 NPE in Phoenix Coproc leading to Region Server
crash (#1886)
f75f03e7cf is described below
commit f75f03e7cfedf8f34c84d1ff8f1d5bbb1ac9c3ea
Author: kadirozde <[email protected]>
AuthorDate: Wed May 1 13:02:25 2024 -0700
PHOENIX-7245 NPE in Phoenix Coproc leading to Region Server crash (#1886)
---
.../phoenix/hbase/index/IndexRegionObserver.java | 66 ++++++++++++++--------
.../end2end/ConcurrentMutationsExtendedIT.java | 11 +++-
2 files changed, 50 insertions(+), 27 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 567f49ea35..18385766a2 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -210,7 +210,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
*/
public static class BatchMutateContext {
- private BatchMutatePhase currentPhase = BatchMutatePhase.PRE;
+ private volatile BatchMutatePhase currentPhase = BatchMutatePhase.PRE;
// The max of reference counts on the pending rows of this batch at the
time this batch arrives
private int maxPendingRowCount = 0;
private final int clientVersion;
@@ -273,12 +273,24 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
public CountDownLatch getCountDownLatch() {
- if (waitList == null) {
- waitList = new ArrayList<>();
+ synchronized (this) {
+ if (waitList == null) {
+ waitList = new ArrayList<>();
+ }
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ waitList.add(countDownLatch);
+ return countDownLatch;
+ }
+ }
+
+ public void countDownAllLatches() {
+ synchronized (this) {
+ if (waitList != null) {
+ for (CountDownLatch countDownLatch : waitList) {
+ countDownLatch.countDown();
+ }
+ }
}
- CountDownLatch countDownLatch = new CountDownLatch(1);
- waitList.add(countDownLatch);
- return countDownLatch;
}
public int getMaxPendingRowCount() {
@@ -1067,11 +1079,9 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private void waitForPreviousConcurrentBatch(TableName table,
BatchMutateContext context)
throws Throwable {
- boolean done;
- BatchMutatePhase phase;
- done = true;
+ boolean done = true;
for (BatchMutateContext lastContext :
context.lastConcurrentBatchContext.values()) {
- phase = lastContext.getCurrentPhase();
+ BatchMutatePhase phase = lastContext.getCurrentPhase();
if (phase == BatchMutatePhase.PRE) {
CountDownLatch countDownLatch =
lastContext.getCountDownLatch();
@@ -1191,7 +1201,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
// Release the locks before making RPC calls for index updates
unlockRows(context);
// Do the first phase index updates
- doPre(c, context, miniBatchOp);
+ doPre(context);
// Acquire the locks again before letting the region proceed with
data table updates
lockRows(context);
if (context.lastConcurrentBatchContext != null) {
@@ -1277,9 +1287,13 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
/**
- * When this hook is called, all the rows in the batch context are locked.
Because the rows
- * are locked, we can safely make updates to the context object and
perform the necessary
- * cleanup.
+ * When this hook is called, all the rows in the batch context are locked
if the batch of
+ * mutations is successful. Because the rows are locked, we can safely
make updates to
+ * pending row states in memory and perform the necessary cleanup in that
case.
+ *
+ * However, when the batch fails, then some of the rows may not be locked.
In that case,
+ * we remove the pending row states from the concurrent hash map without
updating them since
+ * pending rows states become invalid when a batch fails.
*/
@Override
public void
postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
@@ -1297,11 +1311,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
} else {
context.currentPhase = BatchMutatePhase.FAILED;
}
- if (context.waitList != null) {
- for (CountDownLatch countDownLatch : context.waitList) {
- countDownLatch.countDown();
- }
- }
+ context.countDownAllLatches();
removePendingRows(context);
if (context.indexUpdates != null) {
context.indexUpdates.clear();
@@ -1361,6 +1371,16 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
private void removePendingRows(BatchMutateContext context) {
+ if (context.currentPhase == BatchMutatePhase.FAILED) {
+ // This batch failed. All concurrent batches will fail too. So we
can remove
+ // all rows of this batch from the memory as the in-memory row
images are not valid
+ // anymore. Please note that when a batch fails, some of the rows
may not have been
+ // locked and so it is not safe to update the pending row entries in
that case.
+ for (ImmutableBytesPtr rowKey : context.rowsToLock) {
+ pendingRows.remove(rowKey);
+ }
+ return;
+ }
for (RowLock rowLock : context.rowLocks) {
ImmutableBytesPtr rowKey = rowLock.getRowKey();
PendingRow pendingRow = pendingRows.get(rowKey);
@@ -1373,10 +1393,10 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
}
- private void doPre(ObserverContext<RegionCoprocessorEnvironment> c,
BatchMutateContext context,
- MiniBatchOperationInProgress<Mutation> miniBatchOp)
throws IOException {
- long start = EnvironmentEdgeManager.currentTimeMillis();
+ private void doPre(BatchMutateContext context) throws IOException {
+ long start = 0;
try {
+ start = EnvironmentEdgeManager.currentTimeMillis();
if (failPreIndexUpdatesForTesting) {
throw new DoNotRetryIOException("Simulating the first (i.e.,
pre) index table write failure");
}
@@ -1394,8 +1414,6 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
lockRows(context);
rethrowIndexingException(e);
}
- throw new RuntimeException(
- "Somehow didn't complete the index update, but didn't return
succesfully either!");
}
private void extractExpressionsAndColumns(DataInputStream input,
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index f02018f5b9..4bf5ffacc1 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -89,6 +89,10 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
Long.toString(0));
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
Integer.toString(MAX_LOOKBACK_AGE));
+ // The following sets the row lock wait duration to 10 ms to test the
code path handling
+ // row lock timeouts. When there are concurrent mutations, the wait
time can be
+ // much longer than 10 ms.
+ props.put("hbase.rowlock.wait.duration", "10");
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@Parameterized.Parameters(
@@ -300,9 +304,9 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
@Test
public void testConcurrentUpserts() throws Exception {
- int nThreads = 4;
- final int batchSize = 200;
- final int nRows = 51;
+ int nThreads = 10;
+ final int batchSize = 20;
+ final int nRows = 100;
final int nIndexValues = 23;
final String tableName = generateUniqueName();
final String indexName = generateUniqueName();
@@ -333,6 +337,7 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
}
conn.commit();
} catch (SQLException e) {
+ System.out.println(e);
throw new RuntimeException(e);
} finally {
doneSignal.countDown();