kadirozde commented on a change in pull request #701: PHOENIX-5709 Simplify 
index update generation code for consistent glo…
URL: https://github.com/apache/phoenix/pull/701#discussion_r387921207
 
 

 ##########
 File path: 
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
 ##########
 @@ -927,109 +1123,375 @@ private void verifyAndOrRebuildIndex() throws 
IOException {
             // For these options we start with verifying index rows
             parallelizeIndexVerify(verificationPhaseResult);
             nextVerificationResult.before.add(verificationPhaseResult);
-            if (mutations.size() != verificationPhaseResult.getTotalCount()) {
-                throw new DoNotRetryIOException(
-                        "mutations.size() != 
verificationPhaseResult.getTotalCount() at the before phase " +
-                                nextVerificationResult + " 
dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
-            }
         }
         if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == 
IndexTool.IndexVerifyType.BOTH) {
             // For these options, we have identified the rows to be rebuilt 
and now need to rebuild them
             // At this point, dataKeyToDataPutMap includes mapping only for 
the rows to be rebuilt
             mutations.clear();
-            for (Map.Entry<byte[], Put> entry: dataKeyToDataPutMap.entrySet()) 
{
-                mutations.add(entry.getValue());
+
+            for (Map.Entry<byte[], Pair<Put, Delete>> entry: 
dataKeyToMutationMap.entrySet()) {
+                if (entry.getValue().getFirst() != null) {
+                    mutations.add(entry.getValue().getFirst());
+                }
+                if (entry.getValue().getSecond() != null) {
+                    mutations.add(entry.getValue().getSecond());
+                }
             }
             rebuildIndexRows(mutations);
-            nextVerificationResult.rebuiltIndexRowCount += mutations.size();
+            nextVerificationResult.rebuiltIndexRowCount += 
dataKeyToMutationMap.size();
             isBeforeRebuilt = false;
         }
 
         if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == 
IndexTool.IndexVerifyType.BOTH) {
             // We have rebuilt index row and now we need to verify them
-            indexKeyToDataPutMap.clear();
             VerificationResult.PhaseResult verificationPhaseResult = new 
VerificationResult.PhaseResult();
+            indexKeyToMutationMap.clear();
+            for (Map.Entry<byte[], Pair<Put, Delete>> entry: 
dataKeyToMutationMap.entrySet()) {
+                prepareIndexMutations(entry.getValue().getFirst(), 
entry.getValue().getSecond());
+            }
             parallelizeIndexVerify(verificationPhaseResult);
             nextVerificationResult.after.add(verificationPhaseResult);
-            if (mutations.size() != verificationPhaseResult.getTotalCount()) {
-                throw new DoNotRetryIOException(
-                        "mutations.size() != 
verificationPhaseResult.getTotalCount() at the after phase " +
-                                nextVerificationResult + " 
dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
-            }
         }
-        indexKeyToDataPutMap.clear();
         verificationResult.add(nextVerificationResult);
     }
 
+    private boolean isColumnIncluded(Cell cell) {
+        byte[] family = CellUtil.cloneFamily(cell);
+        if (!familyMap.containsKey(family)) {
+            return false;
+        }
+        NavigableSet<byte[]> set = familyMap.get(family);
+        if (set == null || set.isEmpty()) {
+            return true;
+        }
+        byte[] qualifier = CellUtil.cloneQualifier(cell);
+        return set.contains(qualifier);
+    }
+
+    public static long getTimestamp(Mutation m) {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                return cell.getTimestamp();
+            }
+        }
+        throw new IllegalStateException("No cell found");
+    }
+
+
+    /**
+     * This is to reorder the mutations in ascending order by the tuple of 
timestamp and mutation type where
+     * delete comes before put
+     */
+    public static final Comparator<Mutation> MUTATION_TS_COMPARATOR = new 
Comparator<Mutation>() {
+        @Override
+        public int compare(Mutation o1, Mutation o2) {
+            long ts1 = getTimestamp(o1);
+            long ts2 = getTimestamp(o2);
+            if (ts1 < ts2) {
+                return -1;
+            }
+            if (ts1 > ts2) {
+                return 1;
+            }
+            if (o1 instanceof Delete && o2 instanceof Put) {
+                return -1;
+            }
+            if (o1 instanceof Put && o2 instanceof Delete) {
+                return 1;
+            }
+            return 0;
+        }
+    };
+
+    public static List<Mutation> getMutationsWithSameTS(Put put, Delete del) {
+        List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(2);
+        if (put != null) {
+            mutationList.add(put);
+        }
+        if (del != null) {
+            mutationList.add(del);
+        }
+        // Group the cells within a mutation based on their timestamps and 
create a separate mutation for each group
+        mutationList = (List<Mutation>) 
IndexManagementUtil.flattenMutationsByTimestamp(mutationList);
+        // Reorder the mutations on the same row so that delete comes before 
put when they have the same timestamp
+        Collections.sort(mutationList, MUTATION_TS_COMPARATOR);
+        return mutationList;
+    }
+
+    private static Put prepareIndexPut(IndexMaintainer indexMaintainer, 
ImmutableBytesPtr rowKeyPtr,
+                                       ValueGetter mergedRowVG, long ts, 
boolean isRebuild,
+                                       byte[] regionStartKey, byte[] 
regionEndKey)
+            throws IOException {
+        Put indexPut = 
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+                mergedRowVG, rowKeyPtr, ts, null, null);
+        if (indexPut == null) {
+            // No covered column. Just prepare an index row with the empty 
column
+            byte[] indexRowKey = indexMaintainer.buildRowKey(mergedRowVG, 
rowKeyPtr,
+                    regionStartKey, regionEndKey, HConstants.LATEST_TIMESTAMP);
+            indexPut = new Put(indexRowKey);
+        }
+        
indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                indexMaintainer.getEmptyKeyValueQualifier(), ts, 
VERIFIED_BYTES);
+        return indexPut;
+    }
+
+    public static void removeColumn(Put put, Cell deleteCell) {
+        byte[] family = CellUtil.cloneFamily(deleteCell);
+        List<Cell> cellList = put.getFamilyCellMap().get(family);
+        if (cellList == null) {
+            return;
+        }
+        Iterator<Cell> cellIterator = cellList.iterator();
+        while (cellIterator.hasNext()) {
+            Cell cell = cellIterator.next();
+            if (CellUtil.matchingQualifier(cell, deleteCell)) {
+                cellIterator.remove();
+                if (cellList.isEmpty()) {
+                    put.getFamilyCellMap().remove(family);
+                }
+                return;
+            }
+        }
+    }
+
+    public static void merge(Put current, Put previous) throws IOException {
+        for (List<Cell> cells : previous.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                if (!current.has(CellUtil.cloneFamily(cell), 
CellUtil.cloneQualifier(cell))) {
+                    current.add(cell);
+                }
+            }
+        }
+    }
+
+    public static Put mergeNew(Put current, Put previous) throws IOException {
+        Put next = new Put(current);
+        merge(next, previous);
+        return next;
+    }
+
+
+    public static Delete copyDeleteFamilyMarkers(Delete del) throws 
IOException {
+        Delete newDel = new Delete(del.getRow());
+        for (List<Cell> cells : del.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                switch (cell.getType()) {
+                    case DeleteFamily:
+                    case DeleteFamilyVersion:
+                        newDel.addDeleteMarker(cell);
+                }
+            }
+        }
+        return newDel;
+    }
+
+    public static Delete adjustGlobalIndexDeleteMutation(Delete del) throws 
IOException {
+        // For a global index table, we are only interested if the current 
data row is deleted or
+        // not. There is no need to apply column deletes to index rows since 
index rows are always
+        // full rows and all the cells in an index row have the same timestamp 
value. Because of
+        // this index rows versions do not share cells.
+        boolean deleteColumn = false;
+        boolean deleteFamily = false;
+        for (List<Cell> cells : del.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                switch (cell.getType()) {
+                    case DeleteFamily:
+                    case DeleteFamilyVersion:
+                        deleteFamily = true;
+                        break;
+                    case DeleteColumn:
+                    case Delete:
+                        deleteColumn = true;
+                }
+            }
+        }
+        if (!deleteFamily) {
+            return null;
+        }
+        if (deleteColumn) {
+            del = copyDeleteFamilyMarkers(del);
+        }
+        return del;
+    }
+
+    private static void addIndexDeleteMutation(IndexMaintainer 
indexMaintainer, Delete del, List<Mutation> mutationList,
+                                               long ts) throws IOException {
+        // The following unverified put is needed for inline verification to 
treat regular updates and
+        // rebuild updates the same way
+        Put unverifiedPut = new Put(del.getRow());
+        
unverifiedPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                indexMaintainer.getEmptyKeyValueQualifier(), ts, 
UNVERIFIED_BYTES);
+        mutationList.add(unverifiedPut);
+        del = adjustGlobalIndexDeleteMutation(del);
+        mutationList.add(del);
+    }
+
+    public static Collection<Cell> getCollection(Mutation m) {
+        Collection<Cell> collection = new ArrayList<>();
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            collection.addAll(cells);
+        }
+        return collection;
+    }
+
+
+    /**
+     * Generate the index update for a data row from the mutation that are 
obtained by merging the previous data row
+     * state with the pending row mutation for index rebuild. This method is 
called only for global indexes.
+     * pendingMutations is a sorted list of data table mutations that are used 
to replay index table mutations.
+     * This list is sorted in ascending order by the tuple of row key, 
timestamp and mutation type where delete comes
+     * after put.
 
 Review comment:
   I have corrected the comment on MUTATION_TS_COMPARATOR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to