virajjasani commented on code in PR #1903:
URL: https://github.com/apache/phoenix/pull/1903#discussion_r1637222705
##########
phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java:
##########
@@ -1124,9 +1153,19 @@ private void waitForPreviousConcurrentBatch(TableName
table, BatchMutateContext
boolean done = true;
for (BatchMutateContext lastContext :
context.lastConcurrentBatchContext.values()) {
BatchMutatePhase phase = lastContext.getCurrentPhase();
-
- if (phase == BatchMutatePhase.PRE) {
+ if (phase == BatchMutatePhase.FAILED) {
+ done = false;
+ break;
+ } else if (phase == BatchMutatePhase.PRE) {
CountDownLatch countDownLatch =
lastContext.getCountDownLatch();
+ if (countDownLatch == null) {
+ // phase changed from PRE to either FAILED or POST
Review Comment:
How about we get rid of `phase` as local variable and use context phase?
```
if (lastContext.getCurrentPhase() == BatchMutatePhase.FAILED) {
done = false;
break;
} else if (lastContext.getCurrentPhase() ==
BatchMutatePhase.PRE) {
CountDownLatch countDownLatch =
lastContext.getCountDownLatch();
if (countDownLatch == null) {
// phase changed from PRE to either FAILED or POST
if (lastContext.getCurrentPhase() ==
BatchMutatePhase.FAILED) {
done = false;
break;
}
continue;
}
...
...
```
##########
phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java:
##########
@@ -1124,9 +1153,19 @@ private void waitForPreviousConcurrentBatch(TableName
table, BatchMutateContext
boolean done = true;
for (BatchMutateContext lastContext :
context.lastConcurrentBatchContext.values()) {
BatchMutatePhase phase = lastContext.getCurrentPhase();
-
- if (phase == BatchMutatePhase.PRE) {
+ if (phase == BatchMutatePhase.FAILED) {
+ done = false;
+ break;
+ } else if (phase == BatchMutatePhase.PRE) {
CountDownLatch countDownLatch =
lastContext.getCountDownLatch();
+ if (countDownLatch == null) {
+ // phase changed from PRE to either FAILED or POST
+ if (phase == BatchMutatePhase.FAILED) {
Review Comment:
Since `phase` is not getting updated, it will always be `PRE` here. Maybe we
need to change this condition to `if (lastContext.getCurrentPhase() ==
BatchMutatePhase.FAILED)`?
##########
phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java:
##########
@@ -837,28 +843,51 @@ private boolean
isPartialUncoveredIndexMutation(PhoenixIndexMetaData indexMetaDa
return false;
}
/**
- * Retrieve the last committed data row state.
+ * Retrieve the data row state either from memory or disk. The rows are
locked by the caller.
*/
private void
getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
BatchMutateContext context) throws
IOException {
Set<KeyRange> keys = new HashSet<KeyRange>(context.rowsToLock.size());
for (ImmutableBytesPtr rowKeyPtr : context.rowsToLock) {
- PendingRow pendingRow = pendingRows.get(rowKeyPtr);
- if (pendingRow != null &&
pendingRow.getLastContext().getCurrentPhase() == BatchMutatePhase.PRE) {
- if (context.lastConcurrentBatchContext == null) {
- context.lastConcurrentBatchContext = new HashMap<>();
- }
- context.lastConcurrentBatchContext.put(rowKeyPtr,
pendingRow.getLastContext());
- if (context.maxPendingRowCount < pendingRow.getCount()) {
- context.maxPendingRowCount = pendingRow.getCount();
- }
- Put put =
pendingRow.getLastContext().getNextDataRowState(rowKeyPtr);
- if (put != null) {
- context.dataRowStates.put(rowKeyPtr, new Pair<Put,
Put>(put, new Put(put)));
- }
- }
- else {
+ PendingRow pendingRow = new PendingRow(rowKeyPtr, context);
+ // Add the data table rows in the mini batch to the per region
collection of pending
+ // rows. This will be used to detect concurrent updates
+ PendingRow existingPendingRow = pendingRows.putIfAbsent(rowKeyPtr,
pendingRow);
+ if (existingPendingRow == null) {
+ // There was no pending row for this row key. We need to
retrieve this row from disk
keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(),
SortOrder.ASC));
+ } else {
+ // There is a pending row for this row key. We need to
retrieve the row from memory
+ BatchMutateContext lastContext =
existingPendingRow.getLastContext();
+ if (existingPendingRow.add(context)) {
+ BatchMutatePhase phase = lastContext.getCurrentPhase();
+ if (phase == BatchMutatePhase.PRE || phase ==
BatchMutatePhase.POST) {
+ assert phase == BatchMutatePhase.PRE
+ : "the phase of the last batch cannot be POST";
+ if (phase == BatchMutatePhase.PRE) {
+ if (context.lastConcurrentBatchContext == null) {
+ context.lastConcurrentBatchContext = new
HashMap<>();
+ }
+ context.lastConcurrentBatchContext.put(rowKeyPtr,
lastContext);
+ if (context.maxPendingRowCount <
existingPendingRow.getCount()) {
+ context.maxPendingRowCount =
existingPendingRow.getCount();
+ }
+ }
+ Put put = lastContext.getNextDataRowState(rowKeyPtr);
+ if (put != null) {
+ context.dataRowStates.put(rowKeyPtr, new
Pair<>(put, new Put(put)));
+ }
+ } else {
+ // The last batch for this row key failed. We cannot
use the memory state.
+ // So we need to retrieve this row from disk
+
keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(), SortOrder.ASC));
+ }
+ } else {
Review Comment:
We can simplify to this?
```
if (existingPendingRow.add(context)) {
BatchMutatePhase phase = lastContext.getCurrentPhase();
if (phase == BatchMutatePhase.PRE) {
if (context.lastConcurrentBatchContext == null) {
context.lastConcurrentBatchContext = new
HashMap<>();
}
context.lastConcurrentBatchContext.put(rowKeyPtr,
lastContext);
if (context.maxPendingRowCount <
existingPendingRow.getCount()) {
context.maxPendingRowCount =
existingPendingRow.getCount();
}
Put put = lastContext.getNextDataRowState(rowKeyPtr);
if (put != null) {
context.dataRowStates.put(rowKeyPtr, new
Pair<>(put, new Put(put)));
}
} else {
// The last batch for this row key failed. We cannot
use the memory state.
// So we need to retrieve this row from disk
keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(), SortOrder.ASC));
}
} else {
...
...
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]