kadirozde commented on a change in pull request #701: PHOENIX-5709 Simplify
index update generation code for consistent glo…
URL: https://github.com/apache/phoenix/pull/701#discussion_r377787043
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -409,93 +416,262 @@ private void populatePendingRows(BatchMutateContext
context) {
}
}
- private Collection<? extends Mutation>
groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
- long now, ReplayWrite
replayWrite) throws IOException {
- Map<ImmutableBytesPtr, MultiMutation> mutationsMap = new HashMap<>();
- boolean copyMutations = false;
- for (int i = 0; i < miniBatchOp.size(); i++) {
- if (miniBatchOp.getOperationStatus(i) == IGNORE) {
- continue;
+ public static void setTimestamp(Mutation m, long ts) throws IOException {
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ CellUtil.setTimestamp(cell, ts);
}
- Mutation m = miniBatchOp.getOperation(i);
- if (this.builder.isEnabled(m)) {
- // Track whether or not we need to
- ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
- if (mutationsMap.containsKey(row)) {
- copyMutations = true;
- } else {
- mutationsMap.put(row, null);
+ }
+ }
+
+ public static long getTimestamp(Mutation m) {
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ return cell.getTimestamp();
+ }
+ }
+ throw new IllegalStateException("No cell found");
+ }
+
+ private static void removeColumn(Put put, Cell deleteCell) {
+ byte[] family = CellUtil.cloneFamily(deleteCell);
+ List<Cell> cellList = put.getFamilyCellMap().get(family);
+ if (cellList == null) {
+ return;
+ }
+ Iterator<Cell> cellIterator = cellList.iterator();
+ while (cellIterator.hasNext()) {
+ Cell cell = cellIterator.next();
+ if (CellUtil.matchingQualifier(cell, deleteCell)) {
+ cellIterator.remove();
+ if (cellList.isEmpty()) {
+ put.getFamilyCellMap().remove(family);
}
+ return;
}
}
- // early exit if it turns out we don't have any edits
- if (mutationsMap.isEmpty()) {
- return null;
- }
- // If we're copying the mutations
- Collection<Mutation> originalMutations;
- Collection<? extends Mutation> mutations;
- if (copyMutations) {
- originalMutations = null;
- mutations = mutationsMap.values();
- } else {
- originalMutations =
Lists.newArrayListWithExpectedSize(mutationsMap.size());
- mutations = originalMutations;
+ }
+
+
+ private void merge(Put current, Put previous) throws IOException {
+ for (List<Cell> cells : previous.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ if (!current.has(CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell))) {
+ current.add(cell);
+ }
+ }
}
+ }
- boolean resetTimeStamp = replayWrite == null;
+ private Put mergeNew(Put current, Put previous) throws IOException {
+ Put next = new Put(current);
+ for (List<Cell> cells : previous.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ if (!current.has(CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell))) {
+ next.add(cell);
+ }
+ }
+ }
+ return next;
+ }
- for (int i = 0; i < miniBatchOp.size(); i++) {
+ /**
+ * When there are multiple put mutations on the data same row within the
same batch, this method merges them into
+ * one mutation.
+ */
+ private void mergePendingPutMutations(MiniBatchOperationInProgress<Mutation>
miniBatchOp,
+ Map<ImmutableBytesPtr, Integer>
pendingPuts,
+ long now) throws IOException {
+ for (Integer i = 0; i < miniBatchOp.size(); i++) {
+ if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+ continue;
+ }
Mutation m = miniBatchOp.getOperation(i);
// skip this mutation if we aren't enabling indexing
- // unfortunately, we really should ask if the raw mutation (rather
than the combined mutation)
- // should be indexed, which means we need to expose another method
on the builder. Such is the
- // way optimization go though.
- if (miniBatchOp.getOperationStatus(i) != IGNORE &&
this.builder.isEnabled(m)) {
- if (resetTimeStamp) {
- // Unless we're replaying edits to rebuild the index, we
update the time stamp
- // of the data table to prevent overlapping time stamps
(which prevents index
- // inconsistencies as this case isn't handled correctly
currently).
- for (List<Cell> cells : m.getFamilyCellMap().values()) {
- for (Cell cell : cells) {
- CellUtil.setTimestamp(cell, now);
- }
- }
- }
- // No need to write the table mutations when we're rebuilding
- // the index as they're already written and just being replayed.
- if (replayWrite == ReplayWrite.INDEX_ONLY
- || replayWrite == ReplayWrite.REBUILD_INDEX_ONLY) {
- miniBatchOp.setOperationStatus(i, NOWRITE);
+ if (!this.builder.isEnabled(m)) {
+ continue;
+ }
+ // Unless we're replaying edits to rebuild the index, we update the
time stamp
+ // of the data table to prevent overlapping time stamps (which
prevents index
+ // inconsistencies as this case isn't handled correctly currently).
+ setTimestamp(m, now);
+ if (m instanceof Put) {
+ ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
+ Integer opIndex = pendingPuts.get(rowKeyPtr);
+ if (opIndex != null) {
+ merge((Put) m, (Put) miniBatchOp.getOperation(opIndex));
+ miniBatchOp.setOperationStatus(opIndex, NOWRITE);
+ pendingPuts.remove(opIndex);
}
+ pendingPuts.put(rowKeyPtr, i);
+ }
+ }
+ }
- // Only copy mutations if we found duplicate rows
- // which only occurs when we're partially rebuilding
- // the index (since we'll potentially have both a
- // Put and a Delete mutation for the same row).
- if (copyMutations) {
- // Add the mutation to the batch set
-
- ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
- MultiMutation stored = mutationsMap.get(row);
- // we haven't seen this row before, so add it
- if (stored == null) {
- stored = new MultiMutation(row);
- mutationsMap.put(row, stored);
+ /**
+ * When there are delete and put mutations on the data same row within the
same batch, this method applies deletes
+ * on put mutations, that is, effectively merges them.
+ */
+ private void
applyPendingDeleteMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ BatchMutateContext context,
+ Map<ImmutableBytesPtr, Integer>
pendingPuts) throws IOException {
+ for (Integer i = 0; i < miniBatchOp.size(); i++) {
+ if (miniBatchOp.getOperationStatus(i) == IGNORE ||
miniBatchOp.getOperationStatus(i) == NOWRITE) {
+ continue;
+ }
+ Mutation m = miniBatchOp.getOperation(i);
+ if (!this.builder.isEnabled(m)) {
+ continue;
+ }
+ if (!(m instanceof Delete)) {
+ continue;
+ }
+ ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
+ Integer opIndex = pendingPuts.get(rowKeyPtr);
+ Put mergedRow = context.mergedRowStates.get(rowKeyPtr);
+ if (opIndex == null && mergedRow == null) {
+ continue;
+ }
+ Put put = (opIndex != null) ? (Put)
miniBatchOp.getOperation(opIndex) : null;
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ switch (cell.getType()) {
+ case DeleteFamily:
+ case DeleteFamilyVersion:
+ if (put != null) {
+
put.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+ }
+ if (put != mergedRow) {
+
mergedRow.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+ }
+ break;
+ case DeleteColumn:
+ case Delete:
+ if (put != null) {
+ removeColumn(put, cell);
+ }
+ if (put != mergedRow) {
+ removeColumn(mergedRow, cell);
+ }
+ }
+ if (put != null && put.getFamilyCellMap().size() == 0) {
+ pendingPuts.remove(rowKeyPtr);
+ miniBatchOp.setOperationStatus(opIndex, NOWRITE);
+ }
+ if (mergedRow != null && mergedRow.getFamilyCellMap().size()
== 0) {
+ context.mergedRowStates.remove(rowKeyPtr);
}
- stored.addAll(m);
- } else {
- originalMutations.add(m);
}
}
}
+ }
- if (copyMutations || replayWrite != null) {
- mutations =
IndexManagementUtil.flattenMutationsByTimestamp(mutations);
+ private void populateMutationList(MiniBatchOperationInProgress<Mutation>
miniBatchOp,
+ Collection<Mutation> mutationList, boolean
isPut) {
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ Mutation m = miniBatchOp.getOperation(i);
+ if ((isPut && m instanceof Put) || (!isPut && m instanceof Delete)) {
+ if (miniBatchOp.getOperationStatus(i) != IGNORE &&
+ miniBatchOp.getOperationStatus(i) != NOWRITE &&
this.builder.isEnabled(m)) {
+ mutationList.add(m);
+ }
+ }
+ }
+ }
+
+ /**
+ * * Merges the mutations on the same row
+ */
+ private Collection<? extends Mutation>
mergeMutations(ObserverContext<RegionCoprocessorEnvironment> c,
+
MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ BatchMutateContext
context,
+ long now) throws
IOException {
+ if (context.rowsToLock.size() == 0) {
+ return null;
+ }
+ Map<ImmutableBytesPtr, Integer> pendingPuts = new
HashMap<>(miniBatchOp.size());
+ mergePendingPutMutations(miniBatchOp, pendingPuts, now);
+ getCurrentRowStates(c, context);
+ context.mergedRowStates = new HashMap<ImmutableBytesPtr,
Put>(miniBatchOp.size());
+ // Merge pending put mutations with current row states into new merged
states
+ for (Integer i = 0; i < miniBatchOp.size(); i++) {
+ if (miniBatchOp.getOperationStatus(i) == IGNORE ||
miniBatchOp.getOperationStatus(i) == NOWRITE) {
+ continue;
+ }
+ Mutation m = miniBatchOp.getOperation(i);
+ if (!this.builder.isEnabled(m)) {
+ continue;
+ }
+ if (m instanceof Put) {
+ ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
+ Put currentRow = context.currentRowStates.get(rowKeyPtr);
+ Put mergedRow = (currentRow != null) ? mergeNew((Put) m,
currentRow) : (Put)m;
+ context.mergedRowStates.put(rowKeyPtr, mergedRow);
+ }
}
- return mutations;
+
+ // Apply delete mutations on the pending put mutations and merged row
states
+ applyPendingDeleteMutations(miniBatchOp, context, pendingPuts);
+ Collection<Mutation> pendingMutations =
Lists.newArrayListWithExpectedSize(miniBatchOp.size());
+ // Add put mutations into the collection first. This ordering of puts
first and then deletes is important for
+ // obtaining correct index updates
+ populateMutationList(miniBatchOp, pendingMutations, true);
+ // Add delete mutations into the collection (after the put mutations if
any)
+ populateMutationList(miniBatchOp, pendingMutations, false);
+ return pendingMutations;
}
+ /**
+ * This is to reorder the mutations in ascending order by the tuple of row
key, timestamp and mutation type where
+ * delete comes before put
+ */
+ public static final Comparator<Mutation> MUTATION_COMPARATOR = new
Comparator<Mutation>() {
Review comment:
The ordering is very specific to the algorithm here. All I do a simple
sorting here using Java sorted list. I am not sure what else I can leverage
more.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services