gokceni commented on a change in pull request #701: PHOENIX-5709 Simplify index
update generation code for consistent glo…
URL: https://github.com/apache/phoenix/pull/701#discussion_r376124587
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -409,91 +413,220 @@ private void populatePendingRows(BatchMutateContext
context) {
}
}
- private Collection<? extends Mutation>
groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
- long now, ReplayWrite
replayWrite) throws IOException {
- Map<ImmutableBytesPtr, MultiMutation> mutationsMap = new HashMap<>();
- boolean copyMutations = false;
- for (int i = 0; i < miniBatchOp.size(); i++) {
+ public static void setTimestamp(Mutation m, long ts) throws IOException {
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ CellUtil.setTimestamp(cell, ts);
+ }
+ }
+ }
+
+ public static long getTimestamp(Mutation m) throws IOException {
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ return cell.getTimestamp();
+ }
+ }
+ return 0;
+ }
+
+ private static void removeColumn(Put put, Cell deleteCell) {
+ byte[] family = CellUtil.cloneFamily(deleteCell);
+ List<Cell> cellList = put.getFamilyCellMap().get(family);
+ if (cellList == null) {
+ return;
+ }
+ Iterator<Cell> cellIterator = cellList.iterator();
+ while (cellIterator.hasNext()) {
+ Cell cell = cellIterator.next();
+ if (Bytes.compareTo(cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength(),
+ deleteCell.getQualifierArray(),
deleteCell.getQualifierOffset(), deleteCell.getQualifierLength()) == 0) {
+ cellIterator.remove();
+ if (cellList.isEmpty()) {
+ put.getFamilyCellMap().remove(family);
+ }
+ return;
+ }
+ }
+ }
+
+
+ private void merge(Put current, Put previous) throws IOException {
+ for (List<Cell> cells : previous.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ if (!current.has(CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell))) {
+ current.add(cell);
+ }
+ }
+ }
+ }
+
+ private Put mergeNew(Put current, Put previous) throws IOException {
+ Put next = new Put(current);
+ for (List<Cell> cells : previous.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ if (!current.has(CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell))) {
+ next.add(cell);
+ }
+ }
+ }
+ return next;
+ }
+
+ /**
+ * When there are multiple put mutations on the data same row within the
same batch, this method merged them into
+ * one mutation.
+ */
+ private void mergePendingPutMutations(MiniBatchOperationInProgress<Mutation>
miniBatchOp,
+ Map<ImmutableBytesPtr, Integer>
pendingPuts,
+ long now) throws IOException {
+ for (Integer i = 0; i < miniBatchOp.size(); i++) {
if (miniBatchOp.getOperationStatus(i) == IGNORE) {
continue;
}
Mutation m = miniBatchOp.getOperation(i);
+ // skip this mutation if we aren't enabling indexing
if (this.builder.isEnabled(m)) {
- // Track whether or not we need to
- ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
- if (mutationsMap.containsKey(row)) {
- copyMutations = true;
- } else {
- mutationsMap.put(row, null);
+ // Unless we're replaying edits to rebuild the index, we update
the time stamp
+ // of the data table to prevent overlapping time stamps (which
prevents index
+ // inconsistencies as this case isn't handled correctly
currently).
+ setTimestamp(m, now);
+ if (m instanceof Put) {
+ ImmutableBytesPtr rowKeyPtr = new
ImmutableBytesPtr(m.getRow());
+ Integer opIndex = pendingPuts.get(rowKeyPtr);
+ pendingPuts.put(rowKeyPtr, i);
+ if (opIndex != null) {
+ merge((Put) m, (Put) miniBatchOp.getOperation(opIndex));
+ miniBatchOp.setOperationStatus(opIndex, NOWRITE);
+ }
}
}
}
- // early exit if it turns out we don't have any edits
- if (mutationsMap.isEmpty()) {
- return null;
- }
- // If we're copying the mutations
- Collection<Mutation> originalMutations;
- Collection<? extends Mutation> mutations;
- if (copyMutations) {
- originalMutations = null;
- mutations = mutationsMap.values();
- } else {
- originalMutations =
Lists.newArrayListWithExpectedSize(mutationsMap.size());
- mutations = originalMutations;
- }
-
- boolean resetTimeStamp = replayWrite == null;
+ }
- for (int i = 0; i < miniBatchOp.size(); i++) {
+ /**
+ * When there are delete and put mutations on the data same row within the
same batch, this method applies deletes
+ * on put mutations, that is effectively merge them.
+ */
+ private void
applyPendingDeleteMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ BatchMutateContext context,
+ Map<ImmutableBytesPtr, Integer>
pendingPuts) throws IOException {
+ for (Integer i = 0; i < miniBatchOp.size(); i++) {
+ if (miniBatchOp.getOperationStatus(i) == IGNORE ||
miniBatchOp.getOperationStatus(i) == NOWRITE) {
+ continue;
+ }
Mutation m = miniBatchOp.getOperation(i);
- // skip this mutation if we aren't enabling indexing
- // unfortunately, we really should ask if the raw mutation (rather
than the combined mutation)
- // should be indexed, which means we need to expose another method
on the builder. Such is the
- // way optimization go though.
- if (miniBatchOp.getOperationStatus(i) != IGNORE &&
this.builder.isEnabled(m)) {
- if (resetTimeStamp) {
- // Unless we're replaying edits to rebuild the index, we
update the time stamp
- // of the data table to prevent overlapping time stamps
(which prevents index
- // inconsistencies as this case isn't handled correctly
currently).
+ if (!this.builder.isEnabled(m)) {
+ continue;
+ }
+ if (m instanceof Delete) {
+ ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
+ Integer opIndex = pendingPuts.get(rowKeyPtr);
+ Put mergedRow = context.mergedRowStates.get(rowKeyPtr);
+ if (opIndex != null || mergedRow != null) {
+ Put put = (Put) miniBatchOp.getOperation(opIndex);
for (List<Cell> cells : m.getFamilyCellMap().values()) {
for (Cell cell : cells) {
- CellUtil.setTimestamp(cell, now);
+ switch (cell.getType()) {
+ case DeleteFamily:
+ case DeleteFamilyVersion:
+ if (put != null)
+
put.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+ if (mergedRow != null)
+
mergedRow.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+ break;
+ case DeleteColumn:
+ case Delete:
+ if (put != null)
+ removeColumn(put, cell);
+ if (mergedRow != null)
+ removeColumn(mergedRow, cell);
+ }
+ if (put != null && put.getFamilyCellMap().size() ==
0) {
+ pendingPuts.remove(rowKeyPtr);
+ miniBatchOp.setOperationStatus(opIndex, NOWRITE);
+ }
+ if (mergedRow != null &&
mergedRow.getFamilyCellMap().size() == 0) {
+ context.mergedRowStates.remove(rowKeyPtr);
+ }
}
}
}
- // No need to write the table mutations when we're rebuilding
- // the index as they're already written and just being replayed.
- if (replayWrite == ReplayWrite.INDEX_ONLY
- || replayWrite == ReplayWrite.REBUILD_INDEX_ONLY) {
- miniBatchOp.setOperationStatus(i, NOWRITE);
- }
+ }
+ }
+ }
- // Only copy mutations if we found duplicate rows
- // which only occurs when we're partially rebuilding
- // the index (since we'll potentially have both a
- // Put and a Delete mutation for the same row).
- if (copyMutations) {
- // Add the mutation to the batch set
-
- ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
- MultiMutation stored = mutationsMap.get(row);
- // we haven't seen this row before, so add it
- if (stored == null) {
- stored = new MultiMutation(row);
- mutationsMap.put(row, stored);
+ /**
+ * * Merges the mutations on the same row
+ */
+ private Collection<? extends Mutation>
mergeMutations(ObserverContext<RegionCoprocessorEnvironment> c,
+
MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ BatchMutateContext
context,
+ long now, boolean
rebuild) throws IOException {
+ if (!rebuild) {
+ if (context.rowsToLock.size() == 0) {
+ return null;
+ }
+ Map<ImmutableBytesPtr, Integer> pendingPuts = new
HashMap<>(miniBatchOp.size());
+ mergePendingPutMutations(miniBatchOp, pendingPuts, now);
+ getCurrentRowStates(c, context);
+ context.mergedRowStates = new HashMap<ImmutableBytesPtr,
Put>(miniBatchOp.size());
+ // Merge pending put mutations with current row states into new
merged states
+ for (Integer i = 0; i < miniBatchOp.size(); i++) {
+ if (miniBatchOp.getOperationStatus(i) == IGNORE ||
miniBatchOp.getOperationStatus(i) == NOWRITE) {
+ continue;
+ }
+ Mutation m = miniBatchOp.getOperation(i);
+ if (this.builder.isEnabled(m)) {
+ if (m instanceof Put) {
+ ImmutableBytesPtr rowKeyPtr = new
ImmutableBytesPtr(m.getRow());
+ Put currentRow = context.currentRowStates.get(rowKeyPtr);
+ Put mergedRow;
+ if (currentRow != null) {
+ mergedRow = mergeNew((Put) m, currentRow);
+ }
+ else {
Review comment:
nit: move else to line 587
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services