This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 849841aa04 PHOENIX-7473 Eliminating index maintenance for CDC index 
(#2030)
849841aa04 is described below

commit 849841aa04d82bc3e06f770a492f9c94df528ff6
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Thu Nov 28 00:45:12 2024 -0800

    PHOENIX-7473 Eliminating index maintenance for CDC index (#2030)
---
 .../coprocessor/CDCGlobalIndexRegionScanner.java   | 24 +++-----
 .../coprocessor/UncoveredIndexRegionScanner.java   | 14 +++--
 .../phoenix/hbase/index/IndexRegionObserver.java   | 64 ++++++++++++++--------
 3 files changed, 57 insertions(+), 45 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index 4bde0727e9..4cbb4d6147 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -99,20 +99,12 @@ public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScann
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            // firstCell: Picking the earliest cell in the index row so that
-            // timestamp of the cell and the row will be same.
-            Cell firstIndexCell = indexRow.get(indexRow.size() - 1);
-            byte[] indexRowKey = 
ImmutableBytesPtr.cloneCellRowIfNecessary(firstIndexCell);
+            Cell indexCell = indexRow.get(0);
+            byte[] indexRowKey = 
ImmutableBytesPtr.cloneCellRowIfNecessary(indexCell);
             ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
                     indexToDataRowKeyMap.get(indexRowKey));
             Result dataRow = dataRows.get(dataRowKey);
-            int phoenixRowTimestampFunctionOffset = 2 + 
(indexMaintainer.isMultiTenant() ? 1 : 0)
-                    + (indexMaintainer.getViewIndexId() != null ? 1 : 0);
-            ImmutableBytesPtr ptr = new ImmutableBytesPtr();
-            
indexMaintainer.getIndexRowKeySchema().iterator(firstIndexCell.getRowArray(),
-                    firstIndexCell.getRowOffset(), 
firstIndexCell.getRowLength(), ptr,
-                    phoenixRowTimestampFunctionOffset);
-            long changeTS = PLong.INSTANCE.getCodec().decodeLong(ptr, 
SortOrder.ASC);
+            long changeTS = indexCell.getTimestamp();
             TupleProjector dataTableProjector = 
cdcDataTableInfo.getDataTableProjector();
             Expression[] expressions = dataTableProjector != null ?
                     dataTableProjector.getExpressions() : null;
@@ -198,9 +190,9 @@ public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScann
                         }
                     }
                     if (changeBuilder.isNonEmptyEvent()) {
-                        Result cdcRow = getCDCImage(indexRowKey, 
firstIndexCell);
+                        Result cdcRow = getCDCImage(indexRowKey, indexCell);
                         if (cdcRow != null && tupleProjector != null) {
-                            if (firstIndexCell.getType() == 
Cell.Type.DeleteFamily) {
+                            if (indexCell.getType() == Cell.Type.DeleteFamily) 
{
                                 // result is of type 
EncodedColumnQualiferCellsList for queries with
                                 // Order by clause. It fails when Delete 
Family cell is added to it
                                 // as it expects column qualifier bytes which 
is not available.
@@ -208,13 +200,13 @@ public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScann
                                 
result.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
                                         .setRow(indexRowKey)
                                         
.setFamily(ImmutableBytesPtr.cloneCellFamilyIfNecessary(
-                                                firstIndexCell))
+                                                indexCell))
                                         
.setQualifier(indexMaintainer.getEmptyKeyValueQualifier())
-                                        
.setTimestamp(firstIndexCell.getTimestamp())
+                                        .setTimestamp(indexCell.getTimestamp())
                                         .setType(Cell.Type.Put)
                                         .setValue(EMPTY_BYTE_ARRAY).build());
                             } else {
-                                result.add(firstIndexCell);
+                                result.add(indexCell);
                             }
                             IndexUtil.addTupleAsOneCell(result, new 
ResultTuple(cdcRow),
                                     tupleProjector, ptr);
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index aa0ac08c43..70977a545b 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -301,10 +301,13 @@ public abstract class UncoveredIndexRegionScanner extends 
BaseRegionScanner {
             put.add(cell);
         }
         if (indexMaintainer.isCDCIndex()) {
-            // A CDC index row key is PARTITION_ID() + PHOENIX_ROW_TIMESTAMP() 
+ data row key. The
-            // only necessary check is the row timestamp check since the data 
row key is extracted
-            // from the index row key and PARTITION_ID() changes during region 
splits and merges
-            if (IndexUtil.getMaxTimestamp(put) == indexTimestamp) {
+            // A CDC index row key is [view index id] + [tenant id] + 
PARTITION_ID()
+            // + PHOENIX_ROW_TIMESTAMP() + data row key. The only necessary 
check is the row
+            // timestamp check since the data row key is extracted from the 
index row key and
+            // PARTITION_ID() changes during region splits and merges so we 
cannot check it.
+            // If the scan is a raw scan which is expected to be the case for 
CDC scans,
+            // even the time check is not necessary
+            if (scan.isRaw() || IndexUtil.getMaxTimestamp(put) == 
indexTimestamp) {
                 return true;
             }
         } else if (indexMaintainer.checkIndexRow(indexRowKey, put)) {
@@ -324,7 +327,8 @@ public abstract class UncoveredIndexRegionScanner extends 
BaseRegionScanner {
             return true;
         }
         // This is not a valid index row
-        if (indexMaintainer.isAgedEnough(IndexUtil.getMaxTimestamp(put), 
ageThreshold)) {
+        if (indexMaintainer.isAgedEnough(IndexUtil.getMaxTimestamp(put), 
ageThreshold)
+                && !indexMaintainer.isCDCIndex()) {
             region.delete(indexMaintainer.createDelete(indexRowKey, 
IndexUtil.getMaxTimestamp(put),
                     false));
         }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 8a144f1245..1223c3a980 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -40,8 +40,10 @@ import java.util.Optional;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.index.PhoenixIndexBuilderHelper;
@@ -85,7 +87,6 @@ import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
-import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
@@ -270,9 +271,9 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       // passed MiniBatchOperationInProgress, like preWALAppend()
       private List<Mutation> originalMutations;
       private boolean hasAtomic;
-      private boolean hasDelete;
-      private boolean hasUncoveredIndex;
-      private boolean hasGlobalIndex;
+      private boolean hasRowDelete;
+      private boolean hasUncoveredIndex; // Has uncovered global indexes which 
are not CDC Indexes
+      private boolean hasGlobalIndex; // Covered global index
       private boolean hasLocalIndex;
       private boolean hasTransform;
       private boolean returnResult;
@@ -978,8 +979,8 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     }
 
     /**
-     * Generate the index update for a data row from the mutation that are 
obtained by merging the previous data row
-     * state with the pending row mutation.
+     * Generate the index update for a data row from the mutation that are 
obtained by merging the
+     * previous data row state with the pending row mutation.
      */
     private void prepareIndexMutations(BatchMutateContext context, 
List<IndexMaintainer> maintainers, long ts)
             throws IOException {
@@ -1023,13 +1024,16 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                             QueryConstants.UNVERIFIED_BYTES);
                     context.indexUpdates.put(hTableInterfaceReference,
                             new Pair<Mutation, byte[]>(indexPut, 
rowKeyPtr.get()));
-                    // Delete the current index row if the new index key is 
different than the current one
+                    // Delete the current index row if the new index key is 
different from the
+                    // current one and the index is not a CDC index
                     if (currentDataRowState != null) {
                         ValueGetter currentDataRowVG = new 
IndexUtil.SimpleValueGetter(currentDataRowState);
                         byte[] indexRowKeyForCurrentDataRow = indexMaintainer
                                 .buildRowKey(currentDataRowVG, rowKeyPtr, 
null, null,
                                         ts, encodedRegionName);
-                        if (Bytes.compareTo(indexPut.getRow(), 
indexRowKeyForCurrentDataRow) != 0) {
+                        if (!indexMaintainer.isCDCIndex()
+                                && Bytes.compareTo(indexPut.getRow(), 
indexRowKeyForCurrentDataRow)
+                                != 0) {
                             Mutation del = 
indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
                                     IndexMaintainer.DeleteType.ALL_VERSIONS, 
ts);
                             context.indexUpdates.put(hTableInterfaceReference,
@@ -1038,14 +1042,10 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                     }
                 } else if (currentDataRowState != null
                         && 
indexMaintainer.shouldPrepareIndexMutations(currentDataRowState)) {
-                    context.indexUpdates.put(hTableInterfaceReference,
-                            new Pair<Mutation, 
byte[]>(getDeleteIndexMutation(currentDataRowState,
-                                    indexMaintainer, ts, rowKeyPtr, 
encodedRegionName),
-                                    rowKeyPtr.get()));
                     if (indexMaintainer.isCDCIndex()) {
-                        // CDC Index needs two delete markers one for deleting 
the index row, and
-                        // the other for referencing the data table delete 
mutation with the
-                        // right index row key, that is, the index row key 
starting with ts
+                        // CDC Index needs two  a delete marker for 
referencing the data table
+                        // delete mutation with the right index row key, that 
is, the index row key
+                        // starting with ts
                         Put cdcDataRowState = new 
Put(currentDataRowState.getRow());
                         
cdcDataRowState.addColumn(indexMaintainer.getDataEmptyKeyValueCF(),
                                 
indexMaintainer.getEmptyKeyValueQualifierForDataTable(), ts,
@@ -1054,6 +1054,11 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                                 new Pair<Mutation, 
byte[]>(getDeleteIndexMutation(cdcDataRowState,
                                         indexMaintainer, ts, rowKeyPtr, 
encodedRegionName),
                                         rowKeyPtr.get()));
+                    } else {
+                        context.indexUpdates.put(hTableInterfaceReference,
+                                new Pair<Mutation, 
byte[]>(getDeleteIndexMutation(currentDataRowState,
+                                        indexMaintainer, ts, rowKeyPtr, 
encodedRegionName),
+                                        rowKeyPtr.get()));
                     }
                 }
             }
@@ -1061,10 +1066,11 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     }
 
     /**
-     * This method prepares unverified index mutations which are applied to 
index tables before the data table is
-     * updated. In the three-phase update approach, in phase 1, the status of 
existing index rows is set to "unverified"
-     * (these rows will be deleted from the index table in phase 3), and/or 
new put mutations are added with the
-     * unverified status. In phase 2, data table mutations are applied. In 
phase 3, the status for an index table row is
+     * This method prepares unverified index mutations which are applied to 
index tables before
+     * the data table is updated. In the three-phase update approach, in phase 
1, the status of
+     * existing index rows is set to "unverified" these rows will be deleted 
from the index table
+     * in phase 3), and/or new put mutations are added with the unverified 
status. In phase 2,
+     * data table mutations are applied. In phase 3, the status for an index 
table row is
      * either set to "verified" or the row is deleted.
      */
     private void preparePreIndexMutations(BatchMutateContext context,
@@ -1167,7 +1173,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     }
 
     private void identifyMutationTypes(MiniBatchOperationInProgress<Mutation> 
miniBatchOp,
-                                              BatchMutateContext context) {
+                                              BatchMutateContext context) 
throws IOException {
         for (int i = 0; i < miniBatchOp.size(); i++) {
             Mutation m = miniBatchOp.getOperation(i);
             if (this.builder.returnResult(m) && miniBatchOp.size() == 1) {
@@ -1175,11 +1181,21 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
             }
             if (this.builder.isAtomicOp(m) || this.builder.returnResult(m)) {
                 context.hasAtomic = true;
-                if (context.hasDelete) {
+                if (context.hasRowDelete) {
                     return;
                 }
             } else if (m instanceof Delete) {
-                context.hasDelete = true;
+                CellScanner scanner = m.cellScanner();
+                if (m.isEmpty()) {
+                    context.hasRowDelete = true;
+                } else {
+                    while (scanner.advance()) {
+                        if (scanner.current().getType() == 
Cell.Type.DeleteFamily) {
+                            context.hasRowDelete = true;
+                            break;
+                        }
+                    }
+                }
             }
             if (context.hasAtomic || context.returnResult) {
                 return;
@@ -1296,7 +1312,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         identifyMutationTypes(miniBatchOp, context);
         context.populateOriginalMutations(miniBatchOp);
 
-        if (context.hasDelete) {
+        if (context.hasRowDelete) {
             // Need to add cell tags to Delete Marker before we do any index 
processing
             // since we add tags to tables which doesn't have indexes also.
             ServerIndexUtil.setDeleteAttributes(miniBatchOp);
@@ -1319,7 +1335,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
             long start = EnvironmentEdgeManager.currentTimeMillis();
             context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put, 
Put>>(context.rowsToLock.size());
             if (context.hasGlobalIndex || context.hasTransform || 
context.hasAtomic ||
-                    context.returnResult || context.hasDelete || 
(context.hasUncoveredIndex &&
+                    context.returnResult || context.hasRowDelete || 
(context.hasUncoveredIndex &&
                     isPartialUncoveredIndexMutation(indexMetaData, 
miniBatchOp))) {
                 getCurrentRowStates(c, context);
             }

Reply via email to