virajjasani commented on code in PR #1903:
URL: https://github.com/apache/phoenix/pull/1903#discussion_r1637222705


##########
phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java:
##########
@@ -1124,9 +1153,19 @@ private void waitForPreviousConcurrentBatch(TableName 
table, BatchMutateContext
         boolean done = true;
         for (BatchMutateContext lastContext : 
context.lastConcurrentBatchContext.values()) {
             BatchMutatePhase phase = lastContext.getCurrentPhase();
-
-            if (phase == BatchMutatePhase.PRE) {
+            if (phase == BatchMutatePhase.FAILED) {
+                done = false;
+                break;
+            } else if (phase == BatchMutatePhase.PRE) {
                 CountDownLatch countDownLatch = 
lastContext.getCountDownLatch();
+                if (countDownLatch == null) {
+                    // phase changed from PRE to either FAILED or POST

Review Comment:
   How about we get rid of `phase` as local variable and use context phase?
   ```
               if (lastContext.getCurrentPhase() == BatchMutatePhase.FAILED) {
                   done = false;
                   break;
               } else if (lastContext.getCurrentPhase() == 
BatchMutatePhase.PRE) {
                   CountDownLatch countDownLatch = 
lastContext.getCountDownLatch();
                   if (countDownLatch == null) {
                       // phase changed from PRE to either FAILED or POST
                       if (lastContext.getCurrentPhase() == 
BatchMutatePhase.FAILED) {
                           done = false;
                           break;
                       }
                       continue;
                   }
   ...
   ...
   ```



##########
phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java:
##########
@@ -1124,9 +1153,19 @@ private void waitForPreviousConcurrentBatch(TableName 
table, BatchMutateContext
         boolean done = true;
         for (BatchMutateContext lastContext : 
context.lastConcurrentBatchContext.values()) {
             BatchMutatePhase phase = lastContext.getCurrentPhase();
-
-            if (phase == BatchMutatePhase.PRE) {
+            if (phase == BatchMutatePhase.FAILED) {
+                done = false;
+                break;
+            } else if (phase == BatchMutatePhase.PRE) {
                 CountDownLatch countDownLatch = 
lastContext.getCountDownLatch();
+                if (countDownLatch == null) {
+                    // phase changed from PRE to either FAILED or POST
+                    if (phase == BatchMutatePhase.FAILED) {

Review Comment:
   Since `phase` is not getting updated, it will always be `PRE` here. Maybe we 
need to change this condition to `if (lastContext.getCurrentPhase() == 
BatchMutatePhase.FAILED)`?



##########
phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java:
##########
@@ -837,28 +843,51 @@ private boolean 
isPartialUncoveredIndexMutation(PhoenixIndexMetaData indexMetaDa
         return false;
     }
     /**
-     * Retrieve the last committed data row state.
+     * Retrieve the data row state either from memory or disk. The rows are 
locked by the caller.
      */
     private void 
getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
                                      BatchMutateContext context) throws 
IOException {
         Set<KeyRange> keys = new HashSet<KeyRange>(context.rowsToLock.size());
         for (ImmutableBytesPtr rowKeyPtr : context.rowsToLock) {
-            PendingRow pendingRow = pendingRows.get(rowKeyPtr);
-            if (pendingRow != null && 
pendingRow.getLastContext().getCurrentPhase() == BatchMutatePhase.PRE) {
-                if (context.lastConcurrentBatchContext == null) {
-                    context.lastConcurrentBatchContext = new HashMap<>();
-                }
-                context.lastConcurrentBatchContext.put(rowKeyPtr, 
pendingRow.getLastContext());
-                if (context.maxPendingRowCount < pendingRow.getCount()) {
-                    context.maxPendingRowCount = pendingRow.getCount();
-                }
-                Put put = 
pendingRow.getLastContext().getNextDataRowState(rowKeyPtr);
-                if (put != null) {
-                    context.dataRowStates.put(rowKeyPtr, new Pair<Put, 
Put>(put, new Put(put)));
-                }
-            }
-            else {
+            PendingRow pendingRow = new PendingRow(rowKeyPtr, context);
+            // Add the data table rows in the mini batch to the per region 
collection of pending
+            // rows. This will be used to detect concurrent updates
+            PendingRow existingPendingRow = pendingRows.putIfAbsent(rowKeyPtr, 
pendingRow);
+            if (existingPendingRow == null) {
+                // There was no pending row for this row key. We need to 
retrieve this row from disk
                 keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(), 
SortOrder.ASC));
+            } else {
+                // There is a pending row for this row key. We need to 
retrieve the row from memory
+                BatchMutateContext lastContext = 
existingPendingRow.getLastContext();
+                if (existingPendingRow.add(context)) {
+                    BatchMutatePhase phase = lastContext.getCurrentPhase();
+                    if (phase == BatchMutatePhase.PRE || phase == 
BatchMutatePhase.POST) {
+                        assert phase == BatchMutatePhase.PRE
+                                : "the phase of the last batch cannot be POST";
+                        if (phase == BatchMutatePhase.PRE) {
+                            if (context.lastConcurrentBatchContext == null) {
+                                context.lastConcurrentBatchContext = new 
HashMap<>();
+                            }
+                            context.lastConcurrentBatchContext.put(rowKeyPtr, 
lastContext);
+                            if (context.maxPendingRowCount < 
existingPendingRow.getCount()) {
+                                context.maxPendingRowCount = 
existingPendingRow.getCount();
+                            }
+                        }
+                        Put put = lastContext.getNextDataRowState(rowKeyPtr);
+                        if (put != null) {
+                            context.dataRowStates.put(rowKeyPtr, new 
Pair<>(put, new Put(put)));
+                        }
+                    } else {
+                        // The last batch for this row key failed. We cannot 
use the memory state.
+                        // So we need to retrieve this row from disk
+                        
keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(), SortOrder.ASC));
+                    }
+                } else {

Review Comment:
   We can simplify to this?
   
   ```
                   if (existingPendingRow.add(context)) {
                       BatchMutatePhase phase = lastContext.getCurrentPhase();
                       if (phase == BatchMutatePhase.PRE) {
                           if (context.lastConcurrentBatchContext == null) {
                               context.lastConcurrentBatchContext = new 
HashMap<>();
                           }
                           context.lastConcurrentBatchContext.put(rowKeyPtr, 
lastContext);
                           if (context.maxPendingRowCount < 
existingPendingRow.getCount()) {
                               context.maxPendingRowCount = 
existingPendingRow.getCount();
                           }
                           Put put = lastContext.getNextDataRowState(rowKeyPtr);
                           if (put != null) {
                               context.dataRowStates.put(rowKeyPtr, new 
Pair<>(put, new Put(put)));
                           }
                       } else {
                           // The last batch for this row key failed. We cannot 
use the memory state.
                           // So we need to retrieve this row from disk
                           
keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(), SortOrder.ASC));
                       }
                   } else {
   ...
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to