swaroopak commented on a change in pull request #701: PHOENIX-5709 Simplify
index update generation code for consistent glo…
URL: https://github.com/apache/phoenix/pull/701#discussion_r383084324
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
##########
@@ -927,109 +1123,375 @@ private void verifyAndOrRebuildIndex() throws
IOException {
// For these options we start with verifying index rows
parallelizeIndexVerify(verificationPhaseResult);
nextVerificationResult.before.add(verificationPhaseResult);
- if (mutations.size() != verificationPhaseResult.getTotalCount()) {
- throw new DoNotRetryIOException(
- "mutations.size() !=
verificationPhaseResult.getTotalCount() at the before phase " +
- nextVerificationResult + "
dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
- }
}
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType ==
IndexTool.IndexVerifyType.BOTH) {
// For these options, we have identified the rows to be rebuilt
and now need to rebuild them
// At this point, dataKeyToDataPutMap includes mapping only for
the rows to be rebuilt
mutations.clear();
- for (Map.Entry<byte[], Put> entry: dataKeyToDataPutMap.entrySet())
{
- mutations.add(entry.getValue());
+
+ for (Map.Entry<byte[], Pair<Put, Delete>> entry:
dataKeyToMutationMap.entrySet()) {
+ if (entry.getValue().getFirst() != null) {
+ mutations.add(entry.getValue().getFirst());
+ }
+ if (entry.getValue().getSecond() != null) {
+ mutations.add(entry.getValue().getSecond());
+ }
}
rebuildIndexRows(mutations);
- nextVerificationResult.rebuiltIndexRowCount += mutations.size();
+ nextVerificationResult.rebuiltIndexRowCount +=
dataKeyToMutationMap.size();
isBeforeRebuilt = false;
}
if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType ==
IndexTool.IndexVerifyType.BOTH) {
// We have rebuilt index row and now we need to verify them
- indexKeyToDataPutMap.clear();
VerificationResult.PhaseResult verificationPhaseResult = new
VerificationResult.PhaseResult();
+ indexKeyToMutationMap.clear();
+ for (Map.Entry<byte[], Pair<Put, Delete>> entry:
dataKeyToMutationMap.entrySet()) {
+ prepareIndexMutations(entry.getValue().getFirst(),
entry.getValue().getSecond());
+ }
parallelizeIndexVerify(verificationPhaseResult);
nextVerificationResult.after.add(verificationPhaseResult);
- if (mutations.size() != verificationPhaseResult.getTotalCount()) {
- throw new DoNotRetryIOException(
- "mutations.size() !=
verificationPhaseResult.getTotalCount() at the after phase " +
- nextVerificationResult + "
dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
- }
}
- indexKeyToDataPutMap.clear();
verificationResult.add(nextVerificationResult);
}
+ private boolean isColumnIncluded(Cell cell) {
+ byte[] family = CellUtil.cloneFamily(cell);
+ if (!familyMap.containsKey(family)) {
+ return false;
+ }
+ NavigableSet<byte[]> set = familyMap.get(family);
+ if (set == null || set.isEmpty()) {
+ return true;
+ }
+ byte[] qualifier = CellUtil.cloneQualifier(cell);
+ return set.contains(qualifier);
+ }
+
+ public static long getTimestamp(Mutation m) {
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ return cell.getTimestamp();
+ }
+ }
+ throw new IllegalStateException("No cell found");
+ }
+
+
+ /**
+ * This is to reorder the mutations in ascending order by the tuple of
timestamp and mutation type where
+ * delete comes before put
+ */
+ public static final Comparator<Mutation> MUTATION_TS_COMPARATOR = new
Comparator<Mutation>() {
+ @Override
+ public int compare(Mutation o1, Mutation o2) {
+ long ts1 = getTimestamp(o1);
+ long ts2 = getTimestamp(o2);
+ if (ts1 < ts2) {
+ return -1;
+ }
+ if (ts1 > ts2) {
+ return 1;
+ }
+ if (o1 instanceof Delete && o2 instanceof Put) {
+ return -1;
+ }
+ if (o1 instanceof Put && o2 instanceof Delete) {
+ return 1;
+ }
+ return 0;
+ }
+ };
+
+ public static List<Mutation> getMutationsWithSameTS(Put put, Delete del) {
+ List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(2);
+ if (put != null) {
+ mutationList.add(put);
+ }
+ if (del != null) {
+ mutationList.add(del);
+ }
+ // Group the cells within a mutation based on their timestamps and
create a separate mutation for each group
+ mutationList = (List<Mutation>)
IndexManagementUtil.flattenMutationsByTimestamp(mutationList);
+ // Reorder the mutations on the same row so that delete comes before
put when they have the same timestamp
+ Collections.sort(mutationList, MUTATION_TS_COMPARATOR);
+ return mutationList;
+ }
+
+ private static Put prepareIndexPut(IndexMaintainer indexMaintainer,
ImmutableBytesPtr rowKeyPtr,
+ ValueGetter mergedRowVG, long ts,
boolean isRebuild,
+ byte[] regionStartKey, byte[]
regionEndKey)
+ throws IOException {
+ Put indexPut =
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+ mergedRowVG, rowKeyPtr, ts, null, null);
+ if (indexPut == null) {
+ // No covered column. Just prepare an index row with the empty
column
+ byte[] indexRowKey = indexMaintainer.buildRowKey(mergedRowVG,
rowKeyPtr,
+ regionStartKey, regionEndKey, HConstants.LATEST_TIMESTAMP);
+ indexPut = new Put(indexRowKey);
+ }
+
indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ indexMaintainer.getEmptyKeyValueQualifier(), ts,
VERIFIED_BYTES);
+ return indexPut;
+ }
+
+ public static void removeColumn(Put put, Cell deleteCell) {
+ byte[] family = CellUtil.cloneFamily(deleteCell);
+ List<Cell> cellList = put.getFamilyCellMap().get(family);
+ if (cellList == null) {
+ return;
+ }
+ Iterator<Cell> cellIterator = cellList.iterator();
+ while (cellIterator.hasNext()) {
+ Cell cell = cellIterator.next();
+ if (CellUtil.matchingQualifier(cell, deleteCell)) {
+ cellIterator.remove();
+ if (cellList.isEmpty()) {
+ put.getFamilyCellMap().remove(family);
+ }
+ return;
+ }
+ }
+ }
+
+ public static void merge(Put current, Put previous) throws IOException {
+ for (List<Cell> cells : previous.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ if (!current.has(CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell))) {
+ current.add(cell);
+ }
+ }
+ }
+ }
+
+ public static Put mergeNew(Put current, Put previous) throws IOException {
+ Put next = new Put(current);
+ merge(next, previous);
+ return next;
+ }
+
+
+ public static Delete copyDeleteFamilyMarkers(Delete del) throws
IOException {
+ Delete newDel = new Delete(del.getRow());
+ for (List<Cell> cells : del.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ switch (cell.getType()) {
+ case DeleteFamily:
+ case DeleteFamilyVersion:
+ newDel.addDeleteMarker(cell);
+ }
+ }
+ }
+ return newDel;
+ }
+
+ public static Delete adjustGlobalIndexDeleteMutation(Delete del) throws
IOException {
+ // For a global index table, we are only interested if the current
data row is deleted or
+ // not. There is no need to apply column deletes to index rows since
index rows are always
+ // full rows and all the cells in an index row have the same timestamp
value. Because of
+ // this index rows versions do not share cells.
+ boolean deleteColumn = false;
+ boolean deleteFamily = false;
+ for (List<Cell> cells : del.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ switch (cell.getType()) {
+ case DeleteFamily:
+ case DeleteFamilyVersion:
+ deleteFamily = true;
+ break;
+ case DeleteColumn:
+ case Delete:
+ deleteColumn = true;
+ }
+ }
+ }
+ if (!deleteFamily) {
+ return null;
+ }
+ if (deleteColumn) {
+ del = copyDeleteFamilyMarkers(del);
+ }
+ return del;
+ }
+
+ private static void addIndexDeleteMutation(IndexMaintainer
indexMaintainer, Delete del, List<Mutation> mutationList,
+ long ts) throws IOException {
+ // The following unverified put is needed for inline verification to
treat regular updates and
+ // rebuild updates the same way
+ Put unverifiedPut = new Put(del.getRow());
+
unverifiedPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ indexMaintainer.getEmptyKeyValueQualifier(), ts,
UNVERIFIED_BYTES);
+ mutationList.add(unverifiedPut);
+ del = adjustGlobalIndexDeleteMutation(del);
+ mutationList.add(del);
+ }
+
+ public static Collection<Cell> getCollection(Mutation m) {
+ Collection<Cell> collection = new ArrayList<>();
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ collection.addAll(cells);
+ }
+ return collection;
+ }
+
+
+ /**
+ * Generate the index update for a data row from the mutation that are
obtained by merging the previous data row
+ * state with the pending row mutation for index rebuild. This method is
called only for global indexes.
+ * pendingMutations is a sorted list of data table mutations that are used
to replay index table mutations.
+ * This list is sorted in ascending order by the tuple of row key,
timestamp and mutation type where delete comes
+ * after put.
Review comment:
Little confused here, MUTATION_TS_COMPARATOR puts deletes before puts,
comment here says otherwise?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services