kadirozde commented on a change in pull request #701: PHOENIX-5709 Simplify 
index update generation code for consistent glo…
URL: https://github.com/apache/phoenix/pull/701#discussion_r381633828
 
 

 ##########
 File path: 
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
 ##########
 @@ -1092,6 +1093,181 @@ private boolean isColumnIncluded(Cell cell) {
         return set.contains(qualifier);
     }
 
+    public static long getTimestamp(Mutation m) {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                return cell.getTimestamp();
+            }
+        }
+        throw new IllegalStateException("No cell found");
+    }
+
+
+    /**
+     * This is to reorder the mutations in ascending order by the tuple of 
timestamp and mutation type where
+     * delete comes before put
+     */
+    public static final Comparator<Mutation> MUTATION_TS_COMPARATOR = new 
Comparator<Mutation>() {
+        @Override
+        public int compare(Mutation o1, Mutation o2) {
+            long ts1 = getTimestamp(o1);
+            long ts2 = getTimestamp(o2);
+            if (ts1 < ts2) {
+                return -1;
+            }
+            if (ts1 > ts2) {
+                return 1;
+            }
+            if (o1 instanceof Delete && o2 instanceof Put) {
+                return -1;
+            }
+            if (o1 instanceof Put && o2 instanceof Delete) {
+                return 1;
+            }
+            return 0;
+        }
+    };
+
+    public static List<Mutation> getMutationsWithSameTS(Put put, Delete del) {
+        List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(2);
+        if (put != null) {
+            mutationList.add(put);
+        }
+        if (del != null) {
+            mutationList.add(del);
+        }
+        // Group the cells within a mutation based on their timestamps and 
create a separate mutation for each group
+        mutationList = (List<Mutation>) 
IndexManagementUtil.flattenMutationsByTimestamp(mutationList);
+        // Reorder the mutations on the same row so that delete comes before 
put when they have the same timestamp
+        Collections.sort(mutationList, MUTATION_TS_COMPARATOR);
+        return mutationList;
+    }
+
+    private static Put prepareIndexPut(IndexMaintainer indexMaintainer, 
ImmutableBytesPtr rowKeyPtr,
+                                       ValueGetter mergedRowVG, long ts, 
boolean isRebuild,
+                                       byte[] regionStartKey, byte[] 
regionEndKey)
+            throws IOException {
+        Put indexPut = 
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+                mergedRowVG, rowKeyPtr, ts, null, null);
+        if (indexPut == null) {
+            // No covered column. Just prepare an index row with the empty 
column
+            byte[] indexRowKey = indexMaintainer.buildRowKey(mergedRowVG, 
rowKeyPtr,
+                    regionStartKey, regionEndKey, HConstants.LATEST_TIMESTAMP);
+            indexPut = new Put(indexRowKey);
+        }
+        
indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                indexMaintainer.getEmptyKeyValueQualifier(), ts, 
VERIFIED_BYTES);
+        return indexPut;
+    }
+
+    public static void removeColumn(Put put, Cell deleteCell) {
+        byte[] family = CellUtil.cloneFamily(deleteCell);
+        List<Cell> cellList = put.getFamilyCellMap().get(family);
+        if (cellList == null) {
+            return;
+        }
+        Iterator<Cell> cellIterator = cellList.iterator();
+        while (cellIterator.hasNext()) {
+            Cell cell = cellIterator.next();
+            if (CellUtil.matchingQualifier(cell, deleteCell)) {
+                cellIterator.remove();
+                if (cellList.isEmpty()) {
+                    put.getFamilyCellMap().remove(family);
+                }
+                return;
+            }
+        }
+    }
+
+    public static void merge(Put current, Put previous) throws IOException {
+        for (List<Cell> cells : previous.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                if (!current.has(CellUtil.cloneFamily(cell), 
CellUtil.cloneQualifier(cell))) {
+                    current.add(cell);
+                }
+            }
+        }
+    }
+
+    public static Put mergeNew(Put current, Put previous) throws IOException {
+        Put next = new Put(current);
+        merge(next, previous);
+        return next;
+    }
+
+    /**
+     * Generate the index update for a data row from the mutation that are 
obtained by merging the previous data row
+     * state with the pending row mutation for index rebuild. This method is 
called only for global indexes.
+     * pendingMutations is a sorted list of data table mutations that are used 
to replay index table mutations.
+     * This list is sorted in ascending order by the tuple of row key, 
timestamp and mutation type where delete comes
+     * after put.
+     */
+    public static List<Mutation> 
prepareIndexMutationsForRebuild(IndexMaintainer indexMaintainer,
+                                                                 Put dataPut, 
Delete dataDel) throws IOException {
+        List<Mutation> dataMutations = getMutationsWithSameTS(dataPut, 
dataDel);
+        List<Mutation> indexMutations = 
Lists.newArrayListWithExpectedSize(dataMutations.size());
+        // The row key ptr of the data table row for which we will build index 
rows here
+        ImmutableBytesPtr rowKeyPtr = (dataPut != null) ? new 
ImmutableBytesPtr(dataPut.getRow()) :
+                new ImmutableBytesPtr(dataDel.getRow());
+        // Start with empty data table row
+        Put currentDataRow = null;
+        // The index row key corresponding to the current data row
+        byte[] IndexRowKeyForCurrentDataRow = null;
+        for (Mutation mutation : dataMutations) {
+            ValueGetter currentDataRowVG = (currentDataRow == null) ? null : 
new IndexRebuildRegionScanner.SimpleValueGetter(currentDataRow);
+            long ts = getTimestamp(mutation);
+            if (mutation instanceof Put) {
+                // Add this put on top of the current data row state to get 
the next data row state
+                Put nextDataRow = (currentDataRow == null) ? new 
Put((Put)mutation) : mergeNew((Put)mutation, currentDataRow);
+                ValueGetter nextDataRowVG = new 
IndexRebuildRegionScanner.SimpleValueGetter(nextDataRow);
+                Put indexPut = prepareIndexPut(indexMaintainer, rowKeyPtr, 
nextDataRowVG, ts, true, null, null);
+                indexMutations.add(indexPut);
+                // Delete the current index row if the new index key is 
different than the current one
+                if (IndexRowKeyForCurrentDataRow != null) {
 
 Review comment:
   I am not sure how I got the upper case variable name here, probably it 
happened when I did bulk rename. I will change it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to