This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new 49bfb36755 PHOENIX-7245 NPE in Phoenix Coproc leading to Region Server
crash (#1887)
49bfb36755 is described below
commit 49bfb36755ace643fbc6a42415a6fe64e3984719
Author: kadirozde <[email protected]>
AuthorDate: Mon May 6 09:16:18 2024 -0700
PHOENIX-7245 NPE in Phoenix Coproc leading to Region Server crash (#1887)
---
.../end2end/ConcurrentMutationsExtendedIT.java | 13 ++--
.../phoenix/hbase/index/IndexRegionObserver.java | 82 ++++++++++++++--------
2 files changed, 60 insertions(+), 35 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 48768ead05..2d7eb3767a 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -76,10 +76,14 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
@BeforeClass
public static synchronized void doSetup() throws Exception {
- Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(3);
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
Long.toString(0));
props.put(CompatBaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
Integer.toString(MAX_LOOKBACK_AGE));
+ // The following sets the row lock wait duration to 10 ms to test the
code path handling
+ // row lock timeouts. When there are concurrent mutations, the wait
time can be
+ // much longer than 10 ms.
+ props.put("hbase.rowlock.wait.duration", "10");
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@@ -280,9 +284,9 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
@Test
public void testConcurrentUpserts() throws Exception {
- int nThreads = 4;
- final int batchSize = 200;
- final int nRows = 51;
+ int nThreads = 10;
+ final int batchSize = 20;
+ final int nRows = 100;
final int nIndexValues = 23;
final String tableName = generateUniqueName();
final String indexName = generateUniqueName();
@@ -312,6 +316,7 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
}
conn.commit();
} catch (SQLException e) {
+ System.out.println(e);
throw new RuntimeException(e);
} finally {
doneSignal.countDown();
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index f1f03d12af..d6666d191f 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -189,7 +189,7 @@ public class IndexRegionObserver extends
CompatIndexRegionObserver implements Re
*/
public static class BatchMutateContext {
- private BatchMutatePhase currentPhase = BatchMutatePhase.PRE;
+ private volatile BatchMutatePhase currentPhase = BatchMutatePhase.PRE;
// The max of reference counts on the pending rows of this batch at the
time this batch arrives
private int maxPendingRowCount = 0;
private final int clientVersion;
@@ -246,12 +246,24 @@ public class IndexRegionObserver extends
CompatIndexRegionObserver implements Re
}
public CountDownLatch getCountDownLatch() {
- if (waitList == null) {
- waitList = new ArrayList<>();
+ synchronized (this) {
+ if (waitList == null) {
+ waitList = new ArrayList<>();
+ }
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ waitList.add(countDownLatch);
+ return countDownLatch;
+ }
+ }
+
+ public void countDownAllLatches() {
+ synchronized (this) {
+ if (waitList != null) {
+ for (CountDownLatch countDownLatch : waitList) {
+ countDownLatch.countDown();
+ }
+ }
}
- CountDownLatch countDownLatch = new CountDownLatch(1);
- waitList.add(countDownLatch);
- return countDownLatch;
}
public int getMaxPendingRowCount() {
@@ -898,8 +910,6 @@ public class IndexRegionObserver extends
CompatIndexRegionObserver implements Re
}
}
}
- removePendingRows(context);
- context.indexUpdates.clear();
}
private static boolean hasGlobalIndex(PhoenixIndexMetaData indexMetaData) {
@@ -922,11 +932,9 @@ public class IndexRegionObserver extends
CompatIndexRegionObserver implements Re
private void waitForPreviousConcurrentBatch(TableName table,
BatchMutateContext context)
throws Throwable {
- boolean done;
- BatchMutatePhase phase;
- done = true;
+ boolean done = true;
for (BatchMutateContext lastContext :
context.lastConcurrentBatchContext.values()) {
- phase = lastContext.getCurrentPhase();
+ BatchMutatePhase phase = lastContext.getCurrentPhase();
if (phase == BatchMutatePhase.FAILED) {
done = false;
break;
@@ -948,14 +956,8 @@ public class IndexRegionObserver extends
CompatIndexRegionObserver implements Re
}
if (!done) {
// This batch needs to be retried since one of the previous
concurrent batches has not completed yet.
- // Throwing an IOException will result in retries of this batch.
Before throwing exception,
- // we need to remove reference counts and locks for the rows of
this batch
- removePendingRows(context);
- context.indexUpdates.clear();
- for (RowLock rowLock : context.rowLocks) {
- rowLock.release();
- }
- context.rowLocks.clear();
+ // Throwing an IOException will result in retries of this batch.
Removal of reference counts and
+ // locks for the rows of this batch will be done in
postBatchMutateIndispensably()
throw new IOException("One of the previous concurrent mutations
has not completed. " +
"The batch needs to be retried " +
table.getNameAsString());
}
@@ -1048,6 +1050,15 @@ public class IndexRegionObserver extends
CompatIndexRegionObserver implements Re
}
}
+ /**
+ * When this hook is called, all the rows in the batch context are locked
if the batch of
+ * mutations is successful. Because the rows are locked, we can safely
make updates to
+ * pending row states in memory and perform the necessary cleanup in that
case.
+ *
+ * However, when the batch fails, then some of the rows may not be locked.
In that case,
+ * we remove the pending row states from the concurrent hash map without
updating them since
+ * pending rows states become invalid when a batch fails.
+ */
@Override
public void
postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean
success) throws IOException {
@@ -1064,10 +1075,10 @@ public class IndexRegionObserver extends
CompatIndexRegionObserver implements Re
} else {
context.currentPhase = BatchMutatePhase.FAILED;
}
- if (context.waitList != null) {
- for (CountDownLatch countDownLatch : context.waitList) {
- countDownLatch.countDown();
- }
+ context.countDownAllLatches();
+ removePendingRows(context);
+ if (context.indexUpdates != null) {
+ context.indexUpdates.clear();
}
unlockRows(context);
this.builder.batchCompleted(miniBatchOp);
@@ -1124,6 +1135,16 @@ public class IndexRegionObserver extends
CompatIndexRegionObserver implements Re
}
private void removePendingRows(BatchMutateContext context) {
+ if (context.currentPhase == BatchMutatePhase.FAILED) {
+ // This batch failed. All concurrent batches will fail too. So we
can remove
+ // all rows of this batch from the memory as the in-memory row
images are not valid
+ // anymore. Please note that when a batch fails, some of the rows
may not have been
+ // locked and so it is not safe to update the pending row entries in
that case.
+ for (ImmutableBytesPtr rowKey : context.rowsToLock) {
+ pendingRows.remove(rowKey);
+ }
+ return;
+ }
for (RowLock rowLock : context.rowLocks) {
ImmutableBytesPtr rowKey = rowLock.getRowKey();
PendingRow pendingRow = pendingRows.get(rowKey);
@@ -1138,8 +1159,9 @@ public class IndexRegionObserver extends
CompatIndexRegionObserver implements Re
private void doPre(ObserverContext<RegionCoprocessorEnvironment> c,
BatchMutateContext context,
MiniBatchOperationInProgress<Mutation> miniBatchOp)
throws IOException {
- long start = EnvironmentEdgeManager.currentTimeMillis();
+ long start = 0;
try {
+ start = EnvironmentEdgeManager.currentTimeMillis();
if (failPreIndexUpdatesForTesting) {
throw new DoNotRetryIOException("Simulating the first (i.e.,
pre) index table write failure");
}
@@ -1151,14 +1173,12 @@ public class IndexRegionObserver extends
CompatIndexRegionObserver implements Re
metricSource.updatePreIndexUpdateFailureTime(dataTableName,
EnvironmentEdgeManager.currentTimeMillis() - start);
metricSource.incrementPreIndexUpdateFailures(dataTableName);
- // Remove all locks as they are already unlocked. There is no need
to unlock them again later when
- // postBatchMutateIndispensably() is called
- removePendingRows(context);
- context.rowLocks.clear();
+ // Re-acquire all locks since we released them before making index
updates
+ // Removal of reference counts and locks for the rows of this batch
will be
+ // done in postBatchMutateIndispensably()
+ lockRows(context);
rethrowIndexingException(e);
}
- throw new RuntimeException(
- "Somehow didn't complete the index update, but didn't return
succesfully either!");
}
/**