kadirozde commented on a change in pull request #701: PHOENIX-5709 Simplify 
index update generation code for consistent glo…
URL: https://github.com/apache/phoenix/pull/701#discussion_r377787043
 
 

 ##########
 File path: 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 ##########
 @@ -409,93 +416,262 @@ private void populatePendingRows(BatchMutateContext 
context) {
       }
   }
 
-  private Collection<? extends Mutation> 
groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
-                                                        long now, ReplayWrite 
replayWrite) throws IOException {
-      Map<ImmutableBytesPtr, MultiMutation> mutationsMap = new HashMap<>();
-      boolean copyMutations = false;
-      for (int i = 0; i < miniBatchOp.size(); i++) {
-          if (miniBatchOp.getOperationStatus(i) == IGNORE) {
-              continue;
+  public static void setTimestamp(Mutation m, long ts) throws IOException {
+      for (List<Cell> cells : m.getFamilyCellMap().values()) {
+          for (Cell cell : cells) {
+              CellUtil.setTimestamp(cell, ts);
           }
-          Mutation m = miniBatchOp.getOperation(i);
-          if (this.builder.isEnabled(m)) {
-              // Track whether or not we need to
-              ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-              if (mutationsMap.containsKey(row)) {
-                  copyMutations = true;
-              } else {
-                  mutationsMap.put(row, null);
+      }
+  }
+
+    public static long getTimestamp(Mutation m) {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                return cell.getTimestamp();
+            }
+        }
+        throw new IllegalStateException("No cell found");
+    }
+
+  private static void removeColumn(Put put, Cell deleteCell) {
+      byte[] family = CellUtil.cloneFamily(deleteCell);
+      List<Cell> cellList = put.getFamilyCellMap().get(family);
+      if (cellList == null) {
+          return;
+      }
+      Iterator<Cell> cellIterator = cellList.iterator();
+      while (cellIterator.hasNext()) {
+          Cell cell = cellIterator.next();
+          if (CellUtil.matchingQualifier(cell, deleteCell)) {
+              cellIterator.remove();
+              if (cellList.isEmpty()) {
+                  put.getFamilyCellMap().remove(family);
               }
+              return;
           }
       }
-      // early exit if it turns out we don't have any edits
-      if (mutationsMap.isEmpty()) {
-          return null;
-      }
-      // If we're copying the mutations
-      Collection<Mutation> originalMutations;
-      Collection<? extends Mutation> mutations;
-      if (copyMutations) {
-          originalMutations = null;
-          mutations = mutationsMap.values();
-      } else {
-          originalMutations = 
Lists.newArrayListWithExpectedSize(mutationsMap.size());
-          mutations = originalMutations;
+  }
+
+
+  private void merge(Put current, Put previous) throws IOException {
+      for (List<Cell> cells : previous.getFamilyCellMap().values()) {
+          for (Cell cell : cells) {
+              if (!current.has(CellUtil.cloneFamily(cell), 
CellUtil.cloneQualifier(cell))) {
+                  current.add(cell);
+              }
+          }
       }
+  }
 
-      boolean resetTimeStamp = replayWrite == null;
+  private Put mergeNew(Put current, Put previous) throws IOException {
+      Put next = new Put(current);
+      for (List<Cell> cells : previous.getFamilyCellMap().values()) {
+          for (Cell cell : cells) {
+              if (!current.has(CellUtil.cloneFamily(cell), 
CellUtil.cloneQualifier(cell))) {
+                  next.add(cell);
+              }
+          }
+      }
+      return next;
+  }
 
-      for (int i = 0; i < miniBatchOp.size(); i++) {
+  /**
+   * When there are multiple put mutations on the data same row within the 
same batch, this method merges them into
+   * one mutation.
+   */
+  private void mergePendingPutMutations(MiniBatchOperationInProgress<Mutation> 
miniBatchOp,
+                                        Map<ImmutableBytesPtr, Integer> 
pendingPuts,
+                                        long now) throws IOException {
+      for (Integer i = 0; i < miniBatchOp.size(); i++) {
+          if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+              continue;
+          }
           Mutation m = miniBatchOp.getOperation(i);
           // skip this mutation if we aren't enabling indexing
-          // unfortunately, we really should ask if the raw mutation (rather 
than the combined mutation)
-          // should be indexed, which means we need to expose another method 
on the builder. Such is the
-          // way optimization go though.
-          if (miniBatchOp.getOperationStatus(i) != IGNORE && 
this.builder.isEnabled(m)) {
-              if (resetTimeStamp) {
-                  // Unless we're replaying edits to rebuild the index, we 
update the time stamp
-                  // of the data table to prevent overlapping time stamps 
(which prevents index
-                  // inconsistencies as this case isn't handled correctly 
currently).
-                  for (List<Cell> cells : m.getFamilyCellMap().values()) {
-                      for (Cell cell : cells) {
-                          CellUtil.setTimestamp(cell, now);
-                      }
-                  }
-              }
-              // No need to write the table mutations when we're rebuilding
-              // the index as they're already written and just being replayed.
-              if (replayWrite == ReplayWrite.INDEX_ONLY
-                      || replayWrite == ReplayWrite.REBUILD_INDEX_ONLY) {
-                  miniBatchOp.setOperationStatus(i, NOWRITE);
+          if (!this.builder.isEnabled(m)) {
+              continue;
+          }
+          // Unless we're replaying edits to rebuild the index, we update the 
time stamp
+          // of the data table to prevent overlapping time stamps (which 
prevents index
+          // inconsistencies as this case isn't handled correctly currently).
+          setTimestamp(m, now);
+          if (m instanceof Put) {
+              ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
+              Integer opIndex = pendingPuts.get(rowKeyPtr);
+              if (opIndex != null) {
+                  merge((Put) m, (Put) miniBatchOp.getOperation(opIndex));
+                  miniBatchOp.setOperationStatus(opIndex, NOWRITE);
+                  pendingPuts.remove(opIndex);
               }
+              pendingPuts.put(rowKeyPtr, i);
+          }
+      }
+  }
 
-              // Only copy mutations if we found duplicate rows
-              // which only occurs when we're partially rebuilding
-              // the index (since we'll potentially have both a
-              // Put and a Delete mutation for the same row).
-              if (copyMutations) {
-                  // Add the mutation to the batch set
-
-                  ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-                  MultiMutation stored = mutationsMap.get(row);
-                  // we haven't seen this row before, so add it
-                  if (stored == null) {
-                      stored = new MultiMutation(row);
-                      mutationsMap.put(row, stored);
+  /**
+   * When there are delete and put mutations on the data same row within the 
same batch, this method applies deletes
+   * on put mutations, that is, effectively merges them.
+   */
+  private void 
applyPendingDeleteMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                           BatchMutateContext context,
+                                           Map<ImmutableBytesPtr, Integer> 
pendingPuts) throws IOException {
+      for (Integer i = 0; i < miniBatchOp.size(); i++) {
+          if (miniBatchOp.getOperationStatus(i) == IGNORE || 
miniBatchOp.getOperationStatus(i) == NOWRITE) {
+              continue;
+          }
+          Mutation m = miniBatchOp.getOperation(i);
+          if (!this.builder.isEnabled(m)) {
+              continue;
+          }
+          if (!(m instanceof Delete)) {
+              continue;
+          }
+          ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
+          Integer opIndex = pendingPuts.get(rowKeyPtr);
+          Put mergedRow = context.mergedRowStates.get(rowKeyPtr);
+          if (opIndex == null && mergedRow == null) {
+              continue;
+          }
+          Put put = (opIndex != null) ? (Put) 
miniBatchOp.getOperation(opIndex) : null;
+          for (List<Cell> cells : m.getFamilyCellMap().values()) {
+              for (Cell cell : cells) {
+                  switch (cell.getType()) {
+                      case DeleteFamily:
+                      case DeleteFamilyVersion:
+                          if (put != null) {
+                              
put.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+                          }
+                          if (put != mergedRow) {
+                              
mergedRow.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+                          }
+                          break;
+                      case DeleteColumn:
+                      case Delete:
+                          if (put != null) {
+                              removeColumn(put, cell);
+                          }
+                          if (put != mergedRow) {
+                              removeColumn(mergedRow, cell);
+                          }
+                  }
+                  if (put != null && put.getFamilyCellMap().size() == 0) {
+                      pendingPuts.remove(rowKeyPtr);
+                      miniBatchOp.setOperationStatus(opIndex, NOWRITE);
+                  }
+                  if (mergedRow != null && mergedRow.getFamilyCellMap().size() 
== 0) {
+                      context.mergedRowStates.remove(rowKeyPtr);
                   }
-                  stored.addAll(m);
-              } else {
-                  originalMutations.add(m);
               }
           }
       }
+  }
 
-      if (copyMutations || replayWrite != null) {
-          mutations = 
IndexManagementUtil.flattenMutationsByTimestamp(mutations);
+  private void populateMutationList(MiniBatchOperationInProgress<Mutation> 
miniBatchOp,
+                                    Collection<Mutation> mutationList, boolean 
isPut) {
+      for (int i = 0; i < miniBatchOp.size(); i++) {
+          Mutation m = miniBatchOp.getOperation(i);
+          if ((isPut && m instanceof Put) || (!isPut && m instanceof Delete)) {
+              if (miniBatchOp.getOperationStatus(i) != IGNORE &&
+                      miniBatchOp.getOperationStatus(i) != NOWRITE && 
this.builder.isEnabled(m)) {
+                  mutationList.add(m);
+              }
+          }
+      }
+  }
+
+  /**
+   * * Merges the mutations on the same row
+   */
+  private Collection<? extends Mutation> 
mergeMutations(ObserverContext<RegionCoprocessorEnvironment> c,
+                                                        
MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                                        BatchMutateContext 
context,
+                                                        long now) throws 
IOException {
+      if (context.rowsToLock.size() == 0) {
+          return null;
+      }
+      Map<ImmutableBytesPtr, Integer> pendingPuts = new 
HashMap<>(miniBatchOp.size());
+      mergePendingPutMutations(miniBatchOp, pendingPuts, now);
+      getCurrentRowStates(c, context);
+      context.mergedRowStates = new HashMap<ImmutableBytesPtr, 
Put>(miniBatchOp.size());
+      // Merge pending put mutations with current row states into new merged 
states
+      for (Integer i = 0; i < miniBatchOp.size(); i++) {
+          if (miniBatchOp.getOperationStatus(i) == IGNORE || 
miniBatchOp.getOperationStatus(i) == NOWRITE) {
+              continue;
+          }
+          Mutation m = miniBatchOp.getOperation(i);
+          if (!this.builder.isEnabled(m)) {
+              continue;
+          }
+          if (m instanceof Put) {
+              ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
+              Put currentRow = context.currentRowStates.get(rowKeyPtr);
+              Put mergedRow = (currentRow != null) ? mergeNew((Put) m, 
currentRow) : (Put)m;
+              context.mergedRowStates.put(rowKeyPtr, mergedRow);
+          }
       }
-      return mutations;
+
+      // Apply delete mutations on the pending put mutations and merged row 
states
+      applyPendingDeleteMutations(miniBatchOp, context, pendingPuts);
+      Collection<Mutation> pendingMutations = 
Lists.newArrayListWithExpectedSize(miniBatchOp.size());
+      // Add put mutations into the collection first. This ordering of puts 
first and then deletes is important for
+      // obtaining correct index updates
+      populateMutationList(miniBatchOp, pendingMutations, true);
+      // Add delete mutations into the collection (after the put mutations if 
any)
+      populateMutationList(miniBatchOp, pendingMutations, false);
+      return pendingMutations;
   }
 
+  /**
+   * This is to reorder the mutations in ascending order by the tuple of row 
key, timestamp and mutation type where
+   * delete comes before put
+   */
+  public static final Comparator<Mutation> MUTATION_COMPARATOR = new 
Comparator<Mutation>() {
 
 Review comment:
   The ordering is very specific to the algorithm here. All I do a simple 
sorting here using Java sorted list. I am not sure what else I can leverage 
more.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to