virajjasani commented on code in PR #1903:
URL: https://github.com/apache/phoenix/pull/1903#discussion_r1637239697
##########
phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java:
##########
@@ -837,28 +843,51 @@ private boolean
isPartialUncoveredIndexMutation(PhoenixIndexMetaData indexMetaDa
return false;
}
/**
- * Retrieve the last committed data row state.
+ * Retrieve the data row state either from memory or disk. The rows are
locked by the caller.
*/
private void
getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
BatchMutateContext context) throws
IOException {
Set<KeyRange> keys = new HashSet<KeyRange>(context.rowsToLock.size());
for (ImmutableBytesPtr rowKeyPtr : context.rowsToLock) {
- PendingRow pendingRow = pendingRows.get(rowKeyPtr);
- if (pendingRow != null &&
pendingRow.getLastContext().getCurrentPhase() == BatchMutatePhase.PRE) {
- if (context.lastConcurrentBatchContext == null) {
- context.lastConcurrentBatchContext = new HashMap<>();
- }
- context.lastConcurrentBatchContext.put(rowKeyPtr,
pendingRow.getLastContext());
- if (context.maxPendingRowCount < pendingRow.getCount()) {
- context.maxPendingRowCount = pendingRow.getCount();
- }
- Put put =
pendingRow.getLastContext().getNextDataRowState(rowKeyPtr);
- if (put != null) {
- context.dataRowStates.put(rowKeyPtr, new Pair<Put,
Put>(put, new Put(put)));
- }
- }
- else {
+ PendingRow pendingRow = new PendingRow(rowKeyPtr, context);
+ // Add the data table rows in the mini batch to the per region
collection of pending
+ // rows. This will be used to detect concurrent updates
+ PendingRow existingPendingRow = pendingRows.putIfAbsent(rowKeyPtr,
pendingRow);
+ if (existingPendingRow == null) {
+ // There was no pending row for this row key. We need to
retrieve this row from disk
keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(),
SortOrder.ASC));
+ } else {
+ // There is a pending row for this row key. We need to
retrieve the row from memory
+ BatchMutateContext lastContext =
existingPendingRow.getLastContext();
+ if (existingPendingRow.add(context)) {
+ BatchMutatePhase phase = lastContext.getCurrentPhase();
+ if (phase == BatchMutatePhase.PRE || phase ==
BatchMutatePhase.POST) {
+ assert phase == BatchMutatePhase.PRE
+ : "the phase of the last batch cannot be POST";
+ if (phase == BatchMutatePhase.PRE) {
+ if (context.lastConcurrentBatchContext == null) {
+ context.lastConcurrentBatchContext = new
HashMap<>();
+ }
+ context.lastConcurrentBatchContext.put(rowKeyPtr,
lastContext);
+ if (context.maxPendingRowCount <
existingPendingRow.getCount()) {
+ context.maxPendingRowCount =
existingPendingRow.getCount();
+ }
+ }
+ Put put = lastContext.getNextDataRowState(rowKeyPtr);
+ if (put != null) {
+ context.dataRowStates.put(rowKeyPtr, new
Pair<>(put, new Put(put)));
+ }
+ } else {
+ // The last batch for this row key failed. We cannot
use the memory state.
+ // So we need to retrieve this row from disk
+
keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(), SortOrder.ASC));
+ }
+ } else {
Review Comment:
Or better, if we do want to throw Exception if last batch phase is POST,
maybe this would be better?
Here, we only handle `PRE` and `FAILED` cases and throw Exception for `POST`
phase:
```
if (existingPendingRow.add(context)) {
BatchMutatePhase phase = lastContext.getCurrentPhase();
Preconditions.checkArgument(phase !=
BatchMutatePhase.POST,
"the phase of the last batch cannot be POST");
if (phase == BatchMutatePhase.PRE) {
if (context.lastConcurrentBatchContext == null) {
context.lastConcurrentBatchContext = new
HashMap<>();
}
context.lastConcurrentBatchContext.put(rowKeyPtr,
lastContext);
if (context.maxPendingRowCount <
existingPendingRow.getCount()) {
context.maxPendingRowCount =
existingPendingRow.getCount();
}
Put put = lastContext.getNextDataRowState(rowKeyPtr);
if (put != null) {
context.dataRowStates.put(rowKeyPtr, new
Pair<>(put, new Put(put)));
}
} else {
// The last batch for this row key failed. We cannot
use the memory state.
// So we need to retrieve this row from disk
keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(), SortOrder.ASC));
}
} else {
...
...
```
`Preconditions.checkArgument(phase != BatchMutatePhase.POST, "the phase of
the last batch cannot be POST")` will help throw `IllegalArgumentException` if
the phase is unexpectedly `POST` rather than `PRE` or `FAILED`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]