This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new cd043adc16 PHOENIX-7425 Partitioned CDC Index for eliminating salting 
(#2015)
cd043adc16 is described below

commit cd043adc163891d118691c251d40faf0aa998262
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Sat Nov 9 00:35:07 2024 -0800

    PHOENIX-7425 Partitioned CDC Index for eliminating salting (#2015)
---
 .../org/apache/phoenix/compile/DeleteCompiler.java |   4 +
 .../org/apache/phoenix/compile/QueryCompiler.java  |   8 -
 .../apache/phoenix/exception/SQLExceptionCode.java |   3 +
 .../org/apache/phoenix/execute/MutationState.java  |  72 --------
 .../apache/phoenix/expression/ExpressionType.java  |   3 +-
 .../expression/function/PartitionIdFunction.java   | 105 +++++++++++
 .../org/apache/phoenix/index/IndexMaintainer.java  |  97 ++++++++--
 .../apache/phoenix/optimize/QueryOptimizer.java    |  13 +-
 .../apache/phoenix/parse/PartitionIdParseNode.java |  48 +++++
 .../org/apache/phoenix/schema/MetaDataClient.java  |  13 +-
 .../schema/transform/TransformMaintainer.java      |  13 +-
 .../main/java/org/apache/phoenix/util/CDCUtil.java |   2 +-
 .../java/org/apache/phoenix/util/ScanUtil.java     |   2 -
 .../coprocessor/BaseScannerRegionObserver.java     |  10 ++
 .../coprocessor/GlobalIndexRegionScanner.java      |  20 ++-
 .../coprocessor/IndexRepairRegionScanner.java      |   2 +-
 .../phoenix/coprocessor/IndexerRegionScanner.java  |   7 +-
 .../coprocessor/UncoveredIndexRegionScanner.java   |  13 +-
 .../phoenix/hbase/index/IndexRegionObserver.java   |  22 ++-
 .../java/org/apache/phoenix/end2end/CDCBaseIT.java |  54 ++++--
 .../apache/phoenix/end2end/CDCDefinitionIT.java    |  52 +-----
 .../org/apache/phoenix/end2end/CDCQueryIT.java     | 200 +++++++++++++++++----
 .../index/PrepareIndexMutationsForRebuildTest.java |  57 +++---
 .../phoenix/index/VerifySingleIndexRowTest.java    |   2 +-
 .../phoenix/parse/PartitionIdFunctionTest.java     |  63 +++++++
 25 files changed, 615 insertions(+), 270 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index a4d087288d..476d40dc26 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -90,6 +90,7 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 
@@ -1020,6 +1021,9 @@ public class DeleteCompiler {
     }
     
     private static boolean isMaintainedOnClient(PTable table) {
+        if (CDCUtil.isCDCIndex(table)) {
+            return false;
+        }
         // Test for not being local (rather than being GLOBAL) so that this 
doesn't fail
         // when tested with our projected table.
         return (table.getIndexType() != IndexType.LOCAL && 
(table.isTransactional() || table.isImmutableRows())) ||
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 2d15206789..ccc789146b 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -804,14 +804,6 @@ public class QueryCompiler {
                 selectNodes = tmpSelectNodes;
             }
             List<OrderByNode> orderByNodes = select.getOrderBy();
-            // For CDC queries, if no ORDER BY is specified, add default 
ordering.
-            if (orderByNodes.size() == 0) {
-                orderByNodes = Lists.newArrayListWithExpectedSize(1);
-                orderByNodes.add(NODE_FACTORY.orderBy(
-                        NODE_FACTORY.function(PhoenixRowTimestampFunction.NAME,
-                                Collections.emptyList()),
-                        false, SortOrder.getDefault() == SortOrder.ASC));
-            }
             select = NODE_FACTORY.select(select.getFrom(),
                     select.getHint(), select.isDistinct(), selectNodes, 
select.getWhere(),
                     select.getGroupBy(), select.getHaving(), orderByNodes, 
select.getLimit(),
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 48eff59658..d9aa522765 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -56,6 +56,7 @@ import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL;
 
 
@@ -379,6 +380,8 @@ public enum SQLExceptionCode {
             + " property is already defined in hierarchy for this entity"),
     VIEW_TTL_NOT_ENABLED(10961,"44A43", TTL +
             " property can not be set on views as phoenix.view.ttl.enabled is 
false"),
+    SALTING_NOT_ALLOWED_FOR_CDC(10962,"44A44", SALT_BUCKETS +
+            " property can not be set for CDC"),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new 
Factory() {
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
index 6d3c9df480..c1bb46fc85 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -75,9 +75,6 @@ import 
org.apache.phoenix.coprocessorclient.MetaDataProtocol.MetaDataMutationRes
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.exception.StaleMetadataCacheException;
-import org.apache.phoenix.hbase.index.AbstractValueGetter;
-import org.apache.phoenix.hbase.index.ValueGetter;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.exception.IndexWriteException;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -124,7 +121,6 @@ import 
org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilit
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
 import org.apache.phoenix.util.ClientUtil;
-import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
@@ -639,61 +635,6 @@ public class MutationState implements SQLCloseable {
         return ptr;
     }
 
-    private List<Mutation> getCDCDeleteMutations(PTable table, PTable index,
-                                                 Long mutationTimestamp,
-                                                 List<Mutation> mutationList) 
throws
-            SQLException {
-        final ImmutableBytesPtr ptr = new ImmutableBytesPtr();
-        IndexMaintainer maintainer = index.getIndexMaintainer(table, 
connection);
-        List<Mutation> indexMutations = 
Lists.newArrayListWithExpectedSize(mutationList.size());
-        for (final Mutation mutation : mutationList) {
-            // Only generate extra row mutations for DELETE
-            if (mutation instanceof Delete) {
-                ptr.set(mutation.getRow());
-                ValueGetter getter = new AbstractValueGetter() {
-                    @Override
-                    public byte[] getRowKey() {
-                        return mutation.getRow();
-                    }
-                    @Override
-                    public ImmutableBytesWritable 
getLatestValue(ColumnReference ref, long ts) {
-                        // Always return null for our empty key value, as this 
will cause the index
-                        // maintainer to always treat this Put as a new row.
-                        if (IndexUtil.isEmptyKeyValue(table, ref)) {
-                            return null;
-                        }
-                        byte[] family = ref.getFamily();
-                        byte[] qualifier = ref.getQualifier();
-                        Map<byte[], List<Cell>> familyMap = 
mutation.getFamilyCellMap();
-                        List<Cell> kvs = familyMap.get(family);
-                        if (kvs == null) {
-                            return null;
-                        }
-                        for (Cell kv : kvs) {
-                            if (Bytes.compareTo(kv.getFamilyArray(), 
kv.getFamilyOffset(),
-                                    kv.getFamilyLength(), family, 0, 
family.length) == 0
-                                    && Bytes.compareTo(kv.getQualifierArray(),
-                                    kv.getQualifierOffset(), 
kv.getQualifierLength(),
-                                    qualifier, 0, qualifier.length) == 0) {
-                                ImmutableBytesPtr ptr = new 
ImmutableBytesPtr();
-                                
connection.getKeyValueBuilder().getValueAsPtr(kv, ptr);
-                                return ptr;
-                            }
-                        }
-                        return null;
-                    }
-                };
-                ImmutableBytesPtr key = new 
ImmutableBytesPtr(maintainer.buildRowKey(
-                        getter, ptr, null, null, mutationTimestamp));
-                PRow row = index.newRow(
-                        connection.getKeyValueBuilder(), mutationTimestamp, 
key, false);
-                row.delete();
-                indexMutations.addAll(row.toRowMutations());
-            }
-        }
-        return indexMutations;
-    }
-
     private Iterator<Pair<PTable, List<Mutation>>> addRowMutations(final 
TableRef tableRef,
             final MultiRowMutationState values, final long mutationTimestamp, 
final long serverTimestamp,
             boolean includeAllIndexes, final boolean sendAll) {
@@ -771,19 +712,6 @@ public class MutationState implements SQLCloseable {
                             }
                         }
                     }
-
-                    if (CDCUtil.isCDCIndex(index)) {
-                        List<Mutation> cdcMutations = getCDCDeleteMutations(
-                                table, index, mutationTimestamp, mutationList);
-                        if (cdcMutations.size() > 0) {
-                            if (indexMutations == null) {
-                                indexMutations = cdcMutations;
-                            } else {
-                                indexMutations.addAll(cdcMutations);
-                            }
-                        }
-                    }
-
                 } catch (SQLException | IOException e) {
                     throw new IllegalDataException(e);
                 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
index d71c07185a..bc059d9642 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
@@ -199,7 +199,8 @@ public enum ExpressionType {
     JsonModifyFunction(JsonModifyFunction.class),
     BsonConditionExpressionFunction(BsonConditionExpressionFunction.class),
     BsonUpdateExpressionFunction(BsonUpdateExpressionFunction.class),
-    BsonValueFunction(BsonValueFunction.class);
+    BsonValueFunction(BsonValueFunction.class),
+    PartitionIdFunction(PartitionIdFunction.class);
 
     ExpressionType(Class<? extends Expression> clazz) {
         this.clazz = clazz;
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/PartitionIdFunction.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/PartitionIdFunction.java
new file mode 100644
index 0000000000..25a2c4d0ce
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/PartitionIdFunction.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Determinism;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.parse.PartitionIdParseNode;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PDataType;
+
+import java.util.List;
+
+/**
+ * Function to return the partition id which is the encoded data table region 
name as the prefix
+ * of the CDC index row key. This function is used only with CDC Indexes
+ */
+@BuiltInFunction(name = PartitionIdFunction.NAME,
+        nodeClass= PartitionIdParseNode.class,
+        args = {})
+public class PartitionIdFunction extends ScalarFunction {
+    public static final String NAME = "PARTITION_ID";
+    public static final int PARTITION_ID_LENGTH = 32;
+
+    public PartitionIdFunction() {
+    }
+
+    /**
+     *  @param children none
+     *  {@link org.apache.phoenix.parse.PartitionIdParseNode#create create}
+     *  will return the partition id of a given CDC index row.
+     */
+    public PartitionIdFunction(List<Expression> children) {
+        super(children);
+        if (!children.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "PartitionIdFunction should not have any child expression"
+            );
+        }
+    }
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    /**
+     * The evaluate method is called under the following conditions -
+     * 1. When PARTITION_ID() is evaluated in the projection list.
+     *
+     * 2. When PARTITION_ID() is evaluated in the backend as part of the where 
clause.
+     *
+     */
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (tuple == null) {
+            return false;
+        }
+        tuple.getKey(ptr);
+        if (ptr.getLength() < PARTITION_ID_LENGTH) {
+            return false;
+        }
+        // The partition id of a row is always the prefix of the row key
+        ptr.set(ptr.get(), 0, PARTITION_ID_LENGTH);
+        return true;
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return PChar.INSTANCE;
+    }
+
+    @Override
+    public Integer getMaxLength() {
+        return PARTITION_ID_LENGTH;
+    }
+
+    @Override
+    public boolean isStateless() {
+        return false;
+    }
+
+    @Override
+    public Determinism getDeterminism() {
+        return Determinism.PER_ROW;
+    }
+
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 97a0ca785c..c9fde90ca6 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -43,6 +43,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -67,6 +68,7 @@ import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.SingleCellColumnExpression;
 import org.apache.phoenix.expression.SingleCellConstructorExpression;
+import org.apache.phoenix.expression.function.PartitionIdFunction;
 import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor;
 import org.apache.phoenix.hbase.index.AbstractValueGetter;
 import org.apache.phoenix.hbase.index.ValueGetter;
@@ -183,7 +185,8 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             @Override
             public boolean apply(PTable index) {
                 return sendIndexMaintainer(index) && 
IndexUtil.isGlobalIndex(index)
-                        && dataTable.getImmutableStorageScheme() == 
index.getImmutableStorageScheme();
+                        && (dataTable.getImmutableStorageScheme() == 
index.getImmutableStorageScheme()
+                        && !CDCUtil.isCDCIndex(index));
             }
         });
     }
@@ -194,7 +197,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             public boolean apply(PTable index) {
                 return sendIndexMaintainer(index) && ((index.getIndexType() == 
IndexType.GLOBAL
                         && dataTable.getImmutableStorageScheme() != 
index.getImmutableStorageScheme())
-                        || index.getIndexType() == IndexType.LOCAL);
+                        || index.getIndexType() == IndexType.LOCAL || 
CDCUtil.isCDCIndex(index));
             }
         });
     }
@@ -706,10 +709,18 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         this.dataEncodingScheme = sc;
     }
 
-    public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable 
rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, long ts)  {
+    public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable 
rowKeyPtr,
+            byte[] regionStartKey, byte[] regionEndKey, long ts)  {
+        return buildRowKey(valueGetter, rowKeyPtr, regionStartKey, 
regionEndKey, ts, null);
+    }
+    public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable 
rowKeyPtr,
+            byte[] regionStartKey, byte[] regionEndKey, long ts, byte[] 
encodedRegionName)  {
+        if (isCDCIndex && encodedRegionName == null) {
+            throw new IllegalArgumentException("Encoded region name is 
required for a CDC index");
+        }
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         boolean prependRegionStartKey = isLocalIndex && regionStartKey != null;
-        boolean isIndexSalted = !isLocalIndex && nIndexSaltBuckets > 0;
+        boolean isIndexSalted = !isLocalIndex && !isCDCIndex && 
nIndexSaltBuckets > 0;
         int prefixKeyLength =
                 prependRegionStartKey ? (regionStartKey.length != 0 ? 
regionStartKey.length
                         : regionEndKey.length) : 0; 
@@ -785,7 +796,18 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                        dataColumnType = expression.getDataType();
                        dataSortOrder = expression.getSortOrder();
                     isNullable = expression.isNullable();
-                       expression.evaluate(new ValueGetterTuple(valueGetter, 
ts), ptr);
+                    if (expression instanceof PartitionIdFunction) {
+                        if (i != 0) {
+                            throw new DoNotRetryIOException("PARTITION_ID() 
has to be the prefix "
+                            + "of the index row key!");
+                        } else if (!isCDCIndex) {
+                            throw new DoNotRetryIOException("PARTITION_ID() 
should be used only for"
+                                    + " CDC Indexes!");
+                        }
+                        ptr.set(encodedRegionName);
+                    } else {
+                        expression.evaluate(new ValueGetterTuple(valueGetter, 
ts), ptr);
+                    }
                 }
                 else {
                     Field field = dataRowKeySchema.getField(dataPkPosition[i]);
@@ -882,7 +904,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         int dataPosOffset = 0;
         int viewConstantsIndex = 0;
         try {
-            int indexPosOffset = !isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 
0;
+            int indexPosOffset = !isLocalIndex && !isCDCIndex && 
nIndexSaltBuckets > 0 ? 1 : 0;
             int maxRowKeyOffset = indexRowKeyPtr.getOffset() + 
indexRowKeyPtr.getLength();
             indexRowKeySchema.iterator(indexRowKeyPtr, ptr, indexPosOffset);
             if (isDataTableSalted) {
@@ -905,7 +927,8 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 indexPosOffset++;
                 dataPosOffset++;
             }
-            indexPosOffset = (!isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0) 
+ (isMultiTenant ? 1 : 0) + (viewIndexId == null ? 0 : 1);
+            indexPosOffset = (!isLocalIndex && !isCDCIndex && 
nIndexSaltBuckets > 0 ? 1 : 0)
+                    + (isMultiTenant ? 1 : 0) + (viewIndexId == null ? 0 : 1);
             BitSet viewConstantColumnBitSet = 
this.rowKeyMetaData.getViewConstantColumnBitSet();
             BitSet descIndexColumnBitSet = 
rowKeyMetaData.getDescIndexColumnBitSet();
             int trailingVariableWidthColumnNum = 0;
@@ -1003,8 +1026,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         return buildRowKey(valueGetter, new 
ImmutableBytesWritable(dataRow.getRow()),
                 null, null, IndexUtil.getMaxTimestamp(dataRow));
     }
-    public boolean checkIndexRow(final byte[] indexRowKey,
-                                        final Put dataRow) {
+    public boolean checkIndexRow(final byte[] indexRowKey, final Put dataRow) {
         if (!shouldPrepareIndexMutations(dataRow)) {
             return false;
         }
@@ -1016,6 +1038,26 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         return true;
     }
 
+    public  byte[] getIndexRowKey(final Put dataRow, byte[] encodedRegionName) 
{
+        ValueGetter valueGetter = new IndexUtil.SimpleValueGetter(dataRow);
+        return buildRowKey(valueGetter, new 
ImmutableBytesWritable(dataRow.getRow()),
+                null, null, IndexUtil.getMaxTimestamp(dataRow), 
encodedRegionName);
+    }
+
+    public boolean checkIndexRow(final byte[] indexRowKey, final byte[] 
dataRowKey,
+            final Put dataRow, final byte[][] viewConstants) {
+        if (!shouldPrepareIndexMutations(dataRow)) {
+            return false;
+        }
+        byte[] builtDataRowKey = buildDataRowKey(new 
ImmutableBytesWritable(indexRowKey),
+                viewConstants);
+        if (Bytes.compareTo(builtDataRowKey, 0, builtDataRowKey.length,
+                dataRowKey, 0, dataRowKey.length) != 0) {
+            return false;
+        }
+        return true;
+    }
+
     /**
      * Determines if the index row for a given data row should be prepared. 
For full
      * indexes, index rows should always be prepared. For the partial indexes, 
the index row should
@@ -1216,8 +1258,17 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         return indexRowKeySchema;
     }
 
-    public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter 
valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] 
regionStartKey, byte[] regionEndKey, boolean verified) throws IOException {
-        byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, 
regionStartKey, regionEndKey, ts);
+    public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter 
valueGetter,
+            ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] 
regionStartKey,
+            byte[] regionEndKey, boolean verified) throws IOException {
+        return buildUpdateMutation(kvBuilder, valueGetter, dataRowKeyPtr, ts, 
regionStartKey,
+                regionEndKey, verified, null);
+    }
+    public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter 
valueGetter,
+            ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] 
regionStartKey,
+            byte[] regionEndKey, boolean verified, byte[] encodedRegionName) 
throws IOException {
+        byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, 
regionStartKey,
+                regionEndKey, ts, encodedRegionName);
         return buildUpdateMutation(kvBuilder, valueGetter, dataRowKeyPtr, ts, 
regionStartKey, regionEndKey,
                 indexRowKey, this.getEmptyKeyValueFamily(), coveredColumnsMap,
                 indexEmptyKeyValueRef, indexWALDisabled, 
dataImmutableStorageScheme, immutableStorageScheme, encodingScheme, 
dataEncodingScheme, verified);
@@ -1519,12 +1570,24 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
      * Used for immutable indexes that only index PK column values. In that 
case, we can handle a data row deletion,
      * since we can build the corresponding index row key.
      */
-    public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, 
ImmutableBytesWritable dataRowKeyPtr, long ts) throws IOException {
-        return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr, 
Collections.<Cell>emptyList(), ts, null, null);
-    }
-    
-    public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter 
oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<Cell> 
pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws 
IOException {
-        byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, 
regionStartKey, regionEndKey, ts);
+    public Delete buildDeleteMutation(KeyValueBuilder kvBuilder,
+            ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] 
encodedRegionName)
+            throws IOException {
+        return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr, 
Collections.<Cell>emptyList(),
+                ts, null, null, encodedRegionName);
+    }
+
+    public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter 
oldState,
+            ImmutableBytesWritable dataRowKeyPtr, Collection<Cell> 
pendingUpdates, long ts,
+            byte[] regionStartKey, byte[] regionEndKey) throws IOException {
+        return buildDeleteMutation(kvBuilder, oldState, dataRowKeyPtr, 
pendingUpdates, ts,
+                regionStartKey,regionEndKey, null);
+    }
+    public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter 
oldState,
+            ImmutableBytesWritable dataRowKeyPtr, Collection<Cell> 
pendingUpdates, long ts,
+            byte[] regionStartKey, byte[] regionEndKey, byte[] 
encodedRegionName) throws IOException {
+        byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, 
regionStartKey, regionEndKey,
+                ts, encodedRegionName);
         // Delete the entire row if any of the indexed columns changed
         DeleteType deleteType = null;
         if (oldState == null || 
(deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || 
hasIndexedColumnChanged(oldState, pendingUpdates, ts)) { // Deleting the entire 
row
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 6433f9ab31..a562b3ebc8 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -286,11 +286,16 @@ public class QueryOptimizer {
                     targetColumns, parallelIteratorFactory, plans);
             if (hintedPlan != null) {
                 PTable index = hintedPlan.getTableRef().getTable();
-                if (stopAtBestPlan && hintedPlan.isApplicable() && 
(index.getIndexWhere() == null
-                        || isPartialIndexUsable(select, dataPlan, index))) {
-                    return Collections.singletonList(hintedPlan);
+                // Ignore any index hint with a CDC index as it is a truncated 
index, it only
+                // includes index rows that changed within the max lookback 
window
+                if (!CDCUtil.isCDCIndex(index)) {
+                    if (stopAtBestPlan && hintedPlan.isApplicable() && (
+                            index.getIndexWhere() == null || 
isPartialIndexUsable(select, dataPlan,
+                                    index))) {
+                        return Collections.singletonList(hintedPlan);
+                    }
+                    plans.add(0, hintedPlan);
                 }
-                plans.add(0, hintedPlan);
             }
         }
         
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/PartitionIdParseNode.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/PartitionIdParseNode.java
new file mode 100644
index 0000000000..5a5908667f
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/PartitionIdParseNode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import org.apache.phoenix.expression.function.PartitionIdFunction;
+
+
+import java.sql.SQLException;
+import java.util.List;
+
+public class PartitionIdParseNode extends FunctionParseNode {
+
+    PartitionIdParseNode(String name, List<ParseNode> children,
+                                 BuiltInFunctionInfo info) {
+        super(name, children, info);
+    }
+
+    @Override
+    public FunctionExpression create(List<Expression> children, 
StatementContext context)
+            throws SQLException {
+        // It does not take any parameters.
+        if (children.size() != 0) {
+            throw new IllegalArgumentException(
+                    "PartitionIdFunction does not take any parameters"
+            );
+        }
+        return new PartitionIdFunction(children);
+    }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 6851b6fe31..129275dec7 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.schema;
 
 import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TRANSACTIONAL_TABLE;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY;
+import static 
org.apache.phoenix.exception.SQLExceptionCode.SALTING_NOT_ALLOWED_FOR_CDC;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.TABLE_ALREADY_EXIST;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
@@ -169,6 +170,7 @@ import java.util.Objects;
 
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.expression.function.PartitionIdFunction;
 import org.apache.phoenix.expression.function.PhoenixRowTimestampFunction;
 import org.apache.phoenix.parse.CreateCDCStatement;
 import org.apache.phoenix.parse.DropCDCStatement;
@@ -1978,13 +1980,16 @@ public class MetaDataClient {
                 "CREATE UNCOVERED INDEX " + (statement.isIfNotExists() ? "IF 
NOT EXISTS " : "")
                         + 
CDCUtil.getCDCIndexName(statement.getCdcObjName().getName())
                         + " ON " + dataTableFullName + " ("
-                        + PhoenixRowTimestampFunction.NAME + "()) ASYNC";
+                        + PartitionIdFunction.NAME + "(), " + 
PhoenixRowTimestampFunction.NAME
+                        + "()) ASYNC";
         List<String> indexProps = new ArrayList<>();
+        // We do not want to replicate CDC indexes
         indexProps.add("REPLICATION_SCOPE=0");
-        Object saltBucketNum = TableProperty.SALT_BUCKETS.getValue(tableProps);
-        if (saltBucketNum != null) {
-            indexProps.add("SALT_BUCKETS=" + saltBucketNum);
+        if (TableProperty.SALT_BUCKETS.getValue(tableProps) != null) {
+            throw new 
SQLExceptionInfo.Builder(SALTING_NOT_ALLOWED_FOR_CDC).setTableName(
+                    
statement.getCdcObjName().getName()).build().buildException();
         }
+        indexProps.add("SALT_BUCKETS=0");
         Object columnEncodedBytes = 
TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
         if (columnEncodedBytes != null) {
             indexProps.add("COLUMN_ENCODED_BYTES=" + columnEncodedBytes);
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
index 4a20afea1f..50c9948faa 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
@@ -467,7 +467,9 @@ public class TransformMaintainer extends IndexMaintainer {
 
     // Builds new table's rowkey using the old table's rowkey.
     // This method will change when we support rowkey related transforms
-    public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable 
rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, long ts)  {
+    @Override
+    public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable 
rowKeyPtr,
+            byte[] regionStartKey, byte[] regionEndKey, long ts, byte[] 
encodedRegionName)  {
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         boolean isNewTableSalted = nNewTableSaltBuckets > 0;
 
@@ -521,9 +523,12 @@ public class TransformMaintainer extends IndexMaintainer {
         }
     }
 
-    public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter 
valueGetter, ImmutableBytesWritable oldRowKeyPtr,
-                                   long ts, byte[] regionStartKey, byte[] 
regionEndKey, boolean verified) throws IOException {
-        byte[] newRowKey = this.buildRowKey(valueGetter, oldRowKeyPtr, 
regionStartKey, regionEndKey, ts);
+    @Override
+    public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter 
valueGetter,
+            ImmutableBytesWritable oldRowKeyPtr, long ts, byte[] 
regionStartKey,
+            byte[] regionEndKey, boolean verified, byte[] encodedRegionName) 
throws IOException {
+        byte[] newRowKey = this.buildRowKey(valueGetter, oldRowKeyPtr, 
regionStartKey, regionEndKey,
+                ts, encodedRegionName);
         return buildUpdateMutation(kvBuilder, valueGetter, oldRowKeyPtr, ts, 
regionStartKey, regionEndKey,
                 newRowKey, this.getEmptyKeyValueFamily(), coveredColumnsMap,
                 newTableEmptyKeyValueRef, newTableWALDisabled, 
oldTableImmutableStorageScheme, newTableImmutableStorageScheme,
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index 6e87121ef9..74c2263d33 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -38,7 +38,7 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.bson.RawBsonDocument;
 
 public class CDCUtil {
-    public static final String CDC_INDEX_PREFIX = "PHOENIX_CDC_INDEX";
+    public static final String CDC_INDEX_PREFIX = "PHOENIX_CDC_INDEX_";
 
     /**
      * Make a set of CDC change scope enums from the given string containing 
comma separated scope
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 7f50bd6d72..7a97c6a7ed 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -1476,8 +1476,6 @@ public class ScanUtil {
 
         if (context.getCDCTableRef() != null) {
             scan.setAttribute(CDC_DATA_TABLE_DEF, 
CDCTableInfo.toProto(context).toByteArray());
-            CDCUtil.setupScanForCDC(scan);
-            adjustScanFilterForGlobalIndexRegionScanner(scan);
         }
     }
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index aab5c178a3..113b94d4f3 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -54,6 +54,7 @@ import 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.filter.PagingFilter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.index.CDCTableInfo;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory;
 import org.apache.phoenix.iterate.RegionScannerFactory;
@@ -62,6 +63,7 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.ClientUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.QueryUtil;
@@ -70,6 +72,7 @@ import org.apache.phoenix.schema.PTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.CDC_DATA_TABLE_DEF;
 import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForFilter;
 
 abstract public class BaseScannerRegionObserver implements RegionObserver {
@@ -160,6 +163,13 @@ abstract public class BaseScannerRegionObserver implements 
RegionObserver {
             TimeRange timeRange = scan.getTimeRange();
             scan.setTimeRange(timeRange.getMin(), Bytes.toLong(txnScn));
         }
+        if (!scan.isRaw()) {
+            byte[] cdcScan = scan.getAttribute(CDC_DATA_TABLE_DEF);
+            if (cdcScan != null) {
+                CDCUtil.setupScanForCDC(scan);
+                ScanUtil.adjustScanFilterForGlobalIndexRegionScanner(scan);
+            }
+        }
         if (isRegionObserverFor(scan)) {
             // For local indexes, we need to throw if out of region as we'll 
get inconsistent
             // results otherwise while in other cases, it may just mean out 
client-side data
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index f5ef7a87dc..52ca74c1d5 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -1199,13 +1199,13 @@ public abstract class GlobalIndexRegionScanner extends 
BaseRegionScanner {
     }
 
     private static Put prepareIndexPutForRebuild(IndexMaintainer 
indexMaintainer,
-            ImmutableBytesPtr rowKeyPtr, ValueGetter mergedRowVG, long ts) 
throws IOException {
+            ImmutableBytesPtr rowKeyPtr, ValueGetter mergedRowVG, long ts, 
byte[] encodedRegionName) throws IOException {
         Put indexPut = 
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
-                mergedRowVG, rowKeyPtr, ts, null, null, false);
+                mergedRowVG, rowKeyPtr, ts, null, null, false, 
encodedRegionName);
         if (indexPut == null) {
             // No covered column. Just prepare an index row with the empty 
column
             byte[] indexRowKey = indexMaintainer.buildRowKey(mergedRowVG, 
rowKeyPtr,
-                    null, null, ts);
+                    null, null, ts, encodedRegionName);
             indexPut = new Put(indexRowKey);
         } else {
             IndexUtil.removeEmptyColumn(indexPut, 
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
@@ -1280,7 +1280,7 @@ public abstract class GlobalIndexRegionScanner extends 
BaseRegionScanner {
      * and mutation type where delete comes after put.
      */
     public static List<Mutation> 
prepareIndexMutationsForRebuild(IndexMaintainer indexMaintainer,
-                                                                 Put dataPut, 
Delete dataDel) throws IOException {
+            Put dataPut, Delete dataDel, byte[] encodedRegionName) throws 
IOException {
         List<Mutation> dataMutations = getMutationsWithSameTS(dataPut, 
dataDel);
         List<Mutation> indexMutations = 
Lists.newArrayListWithExpectedSize(dataMutations.size());
         // The row key ptr of the data table row for which we will build index 
rows here
@@ -1341,7 +1341,7 @@ public abstract class GlobalIndexRegionScanner extends 
BaseRegionScanner {
                     }
                     ValueGetter nextDataRowVG = new 
IndexUtil.SimpleValueGetter(nextDataRow);
                     Put indexPut = prepareIndexPutForRebuild(indexMaintainer, 
rowKeyPtr,
-                            nextDataRowVG, ts);
+                            nextDataRowVG, ts, encodedRegionName);
                     indexMutations.add(indexPut);
                     // Delete the current index row if the new index key is 
different than the current one
                     if (indexRowKeyForCurrentDataRow != null) {
@@ -1380,8 +1380,9 @@ public abstract class GlobalIndexRegionScanner extends 
BaseRegionScanner {
                             // CDC Index needs two delete markers one for 
deleting the index row,
                             // and the other for referencing the data table 
delete mutation with
                             // the right index row key, that is, the index row 
key starting with ts
-                            
indexMutations.add(IndexRegionObserver.getDeleteIndexMutation(
-                                    currentDataRowState, indexMaintainer, ts, 
rowKeyPtr));
+                            indexMutations.add(IndexRegionObserver
+                                    
.getDeleteIndexMutation(currentDataRowState, indexMaintainer,
+                                            ts, rowKeyPtr, encodedRegionName));
                         }
                     }
                     currentDataRowState = null;
@@ -1398,7 +1399,7 @@ public abstract class GlobalIndexRegionScanner extends 
BaseRegionScanner {
                     }
                     ValueGetter nextDataRowVG = new 
IndexUtil.SimpleValueGetter(nextDataRowState);
                     Put indexPut = prepareIndexPutForRebuild(indexMaintainer, 
rowKeyPtr,
-                            nextDataRowVG, ts);
+                            nextDataRowVG, ts, encodedRegionName);
                     indexMutations.add(indexPut);
                     // Delete the current index row if the new index key is 
different than the current one
                     if (indexRowKeyForCurrentDataRow != null) {
@@ -1424,7 +1425,8 @@ public abstract class GlobalIndexRegionScanner extends 
BaseRegionScanner {
                                      Set<byte[]> mostRecentIndexRowKeys) 
throws IOException {
         List<Mutation> indexMutations;
 
-        indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, 
del);
+        indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, 
del,
+                region.getRegionInfo().getEncodedNameAsBytes());
         Collections.reverse(indexMutations);
 
         boolean mostRecentDone = false;
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
index ef444c1c8d..f82088d217 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
@@ -121,7 +121,7 @@ public class IndexRepairRegionScanner extends 
GlobalIndexRegionScanner {
                 del.add(cell);
             }
         }
-        List<Mutation> indexMutations = 
prepareIndexMutationsForRebuild(indexMaintainer, put, del);
+        List<Mutation> indexMutations = 
prepareIndexMutationsForRebuild(indexMaintainer, put, del, null);
         Collections.reverse(indexMutations);
         for (Mutation mutation : indexMutations) {
             byte[] indexRowKey = mutation.getRow();
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
index d0b56b35b5..105d21f1ba 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
@@ -152,7 +152,8 @@ public class IndexerRegionScanner extends 
GlobalIndexRegionScanner {
         ValueGetter valueGetter = new IndexUtil.SimpleValueGetter(dataRow);
         long ts = IndexUtil.getMaxTimestamp(dataRow);
         Put indexPut = 
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
-                valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, 
null, null, false);
+                valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, 
null, null,
+                false, region.getRegionInfo().getEncodedNameAsBytes());
 
         if (indexPut == null) {
             // This means the data row does not have any covered column values
@@ -372,6 +373,7 @@ public class IndexerRegionScanner extends 
GlobalIndexRegionScanner {
     public boolean next(List<Cell> results) throws IOException {
         Cell lastCell = null;
         int rowCount = 0;
+        byte[] encodedRegionName = 
region.getRegionInfo().getEncodedNameAsBytes();
         region.startRegionOperation();
         try {
             synchronized (innerScanner) {
@@ -417,7 +419,8 @@ public class IndexerRegionScanner extends 
GlobalIndexRegionScanner {
                             uuidValue = commitIfReady(uuidValue, mutations);
                         } else {
                             indexKeyToDataPutMap
-                                    .put(indexMaintainer.getIndexRowKey(put), 
put);
+                                    .put(indexMaintainer.getIndexRowKey(put, 
encodedRegionName),
+                                            put);
                         }
                         rowCount++;
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index d8cab301f6..aa0ac08c43 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -300,7 +300,14 @@ public abstract class UncoveredIndexRegionScanner extends 
BaseRegionScanner {
         for (Cell cell : dataRow.rawCells()) {
             put.add(cell);
         }
-        if (indexMaintainer.checkIndexRow(indexRowKey, put)) {
+        if (indexMaintainer.isCDCIndex()) {
+            // A CDC index row key is PARTITION_ID() + PHOENIX_ROW_TIMESTAMP() 
+ data row key. The
+            // only necessary check is the row timestamp check since the data 
row key is extracted
+            // from the index row key and PARTITION_ID() changes during region 
splits and merges
+            if (IndexUtil.getMaxTimestamp(put) == indexTimestamp) {
+                return true;
+            }
+        } else if (indexMaintainer.checkIndexRow(indexRowKey, put)) {
             if (IndexUtil.getMaxTimestamp(put) != indexTimestamp) {
                 Mutation[] mutations;
                 Put indexPut = new Put(indexRowKey);
@@ -316,8 +323,10 @@ public abstract class UncoveredIndexRegionScanner extends 
BaseRegionScanner {
             }
             return true;
         }
+        // This is not a valid index row
         if (indexMaintainer.isAgedEnough(IndexUtil.getMaxTimestamp(put), 
ageThreshold)) {
-            region.delete(indexMaintainer.createDelete(indexRowKey, 
IndexUtil.getMaxTimestamp(put), false));
+            region.delete(indexMaintainer.createDelete(indexRowKey, 
IndexUtil.getMaxTimestamp(put),
+                    false));
         }
         return false;
     }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index d4c977c5cb..8a144f1245 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -378,6 +378,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
   private List<Set<ImmutableBytesPtr>> batchesWithLastTimestamp = new 
ArrayList<>();
   private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
   private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 
100;
+  private byte[] encodedRegionName;
 
   @Override
   public Optional<RegionObserver> getRegionObserver() {
@@ -388,6 +389,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
   public void start(CoprocessorEnvironment e) throws IOException {
       try {
         final RegionCoprocessorEnvironment env = 
(RegionCoprocessorEnvironment) e;
+        encodedRegionName = 
env.getRegion().getRegionInfo().getEncodedNameAsBytes();
         String serverName = env.getServerName().getServerName();
         if (env.getConfiguration().getBoolean(CHECK_VERSION_CONF_KEY, true)) {
           // make sure the right version <-> combinations are allowed.
@@ -967,9 +969,10 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         }
     }
     public static Mutation getDeleteIndexMutation(Put dataRowState, 
IndexMaintainer indexMaintainer,
-            long ts, ImmutableBytesPtr rowKeyPtr) {
+            long ts, ImmutableBytesPtr rowKeyPtr, byte[] encodedRegionName) {
         ValueGetter dataRowVG = new IndexUtil.SimpleValueGetter(dataRowState);
-        byte[] indexRowKey = indexMaintainer.buildRowKey(dataRowVG, rowKeyPtr, 
null, null, ts);
+        byte[] indexRowKey = indexMaintainer.buildRowKey(dataRowVG, rowKeyPtr, 
null, null,
+                ts, encodedRegionName);
         return indexMaintainer.buildRowDeleteMutation(indexRowKey,
                 IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
     }
@@ -1004,11 +1007,11 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                         && 
indexMaintainer.shouldPrepareIndexMutations(nextDataRowState)) {
                     ValueGetter nextDataRowVG = new 
IndexUtil.SimpleValueGetter(nextDataRowState);
                     Put indexPut = 
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
-                            nextDataRowVG, rowKeyPtr, ts, null, null, false);
+                            nextDataRowVG, rowKeyPtr, ts, null, null, false, 
encodedRegionName);
                     if (indexPut == null) {
                         // No covered column. Just prepare an index row with 
the empty column
                         byte[] indexRowKey = 
indexMaintainer.buildRowKey(nextDataRowVG, rowKeyPtr,
-                                null, null, ts);
+                                null, null, ts, encodedRegionName);
                         indexPut = new Put(indexRowKey);
                     } else {
                         IndexUtil.removeEmptyColumn(indexPut, 
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
@@ -1023,8 +1026,9 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                     // Delete the current index row if the new index key is 
different than the current one
                     if (currentDataRowState != null) {
                         ValueGetter currentDataRowVG = new 
IndexUtil.SimpleValueGetter(currentDataRowState);
-                        byte[] indexRowKeyForCurrentDataRow = 
indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
-                                null, null, ts);
+                        byte[] indexRowKeyForCurrentDataRow = indexMaintainer
+                                .buildRowKey(currentDataRowVG, rowKeyPtr, 
null, null,
+                                        ts, encodedRegionName);
                         if (Bytes.compareTo(indexPut.getRow(), 
indexRowKeyForCurrentDataRow) != 0) {
                             Mutation del = 
indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
                                     IndexMaintainer.DeleteType.ALL_VERSIONS, 
ts);
@@ -1036,7 +1040,8 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                         && 
indexMaintainer.shouldPrepareIndexMutations(currentDataRowState)) {
                     context.indexUpdates.put(hTableInterfaceReference,
                             new Pair<Mutation, 
byte[]>(getDeleteIndexMutation(currentDataRowState,
-                                    indexMaintainer, ts, rowKeyPtr), 
rowKeyPtr.get()));
+                                    indexMaintainer, ts, rowKeyPtr, 
encodedRegionName),
+                                    rowKeyPtr.get()));
                     if (indexMaintainer.isCDCIndex()) {
                         // CDC Index needs two delete markers one for deleting 
the index row, and
                         // the other for referencing the data table delete 
mutation with the
@@ -1047,7 +1052,8 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                                 ByteUtil.EMPTY_BYTE_ARRAY);
                         context.indexUpdates.put(hTableInterfaceReference,
                                 new Pair<Mutation, 
byte[]>(getDeleteIndexMutation(cdcDataRowState,
-                                        indexMaintainer, ts, rowKeyPtr), 
rowKeyPtr.get()));
+                                        indexMaintainer, ts, rowKeyPtr, 
encodedRegionName),
+                                        rowKeyPtr.get()));
                     }
                 }
             }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
index 508f6fced0..08bf35279d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -173,13 +173,13 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
     }
 
     protected void createCDC(Connection conn, String cdc_sql) throws Exception 
{
-        createCDC(conn, cdc_sql, null, null);
+        createCDC(conn, cdc_sql, null);
     }
 
     protected void createCDC(Connection conn, String cdc_sql,
-            PTable.QualifierEncodingScheme encodingScheme, Integer 
nSaltBuckets) throws Exception {
+            PTable.QualifierEncodingScheme encodingScheme) throws Exception {
         // For CDC, multitenancy gets derived automatically via the parent 
table.
-        createTable(conn, cdc_sql, encodingScheme, false, nSaltBuckets, false, 
null);
+        createTable(conn, cdc_sql, encodingScheme, false, null, false, null);
     }
 
     protected void assertCDCState(Connection conn, String cdcName, String 
expInclude,
@@ -297,9 +297,8 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
         return changeRow;
     }
 
-    protected List<Set<ChangeRow>> generateMutations(long startTS, Map<String, 
String> pkColumns,
-                                                     Map<String, String> 
dataColumns,
-                                                     int nRows, int nBatches)
+    protected List<Set<ChangeRow>> generateMutations(String tenantId, long 
startTS, Map<String,
+            String> pkColumns, Map<String, String> dataColumns, int nRows, int 
nBatches)
     {
         Random rand = new Random();
         // Generate unique rows
@@ -336,11 +335,11 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
                     }
                     ChangeRow changeRow;
                     if (isDelete) {
-                        changeRow = new ChangeRow(null, batchTS, rows.get(j), 
null);
+                        changeRow = new ChangeRow(tenantId, batchTS, 
rows.get(j), null);
                         gotDelete = true;
                     }
                     else {
-                        changeRow = new ChangeRow(null, batchTS, rows.get(j),
+                        changeRow = new ChangeRow(tenantId, batchTS, 
rows.get(j),
                                 generateSampleData(rand, dataColumns, true));
                     }
                     batch.add(changeRow);
@@ -637,16 +636,18 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
     protected void verifyChangesViaSCN(String tenantId, ResultSet rs, String 
dataTableName,
                                        Map<String, String> dataColumns, 
List<ChangeRow> changes,
                                        Set<PTable.CDCChangeScope> 
changeScopes) throws Exception {
+        // CDC guarantees that the set of changes on a given row is delivered 
in the order of
+        // their change timestamps. That is why we need to convert the list of 
changes to
+        // a collection of per row list of changes
+        Map<Map<String, Object>, List<ChangeRow>> changeMap = new HashMap();
         Set<Map<String, Object>> deletedRows = new HashSet<>();
-        for (int i = 0, changenr = 0; i < changes.size(); ++i) {
-            ChangeRow changeRow = changes.get(i);
+        for (ChangeRow changeRow : changes) {
             if (tenantId != null && changeRow.getTenantID() != tenantId) {
                 continue;
             }
             if (changeRow.getChangeType() == CDC_DELETE_EVENT_TYPE) {
                 // Consecutive delete operations don't appear as separate 
events.
                 if (deletedRows.contains(changeRow.pks)) {
-                    ++changenr;
                     continue;
                 }
                 deletedRows.add(changeRow.pks);
@@ -654,14 +655,31 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
             else {
                 deletedRows.remove(changeRow.pks);
             }
-            String changeDesc = "Change " + changenr + ": " + changeRow;
-            assertTrue(changeDesc, rs.next());
+            List<ChangeRow> rowVersionList = changeMap.get(changeRow.pks);
+            if (rowVersionList == null) {
+                rowVersionList = new ArrayList<>();
+                changeMap.put(changeRow.pks, rowVersionList);
+            }
+            rowVersionList.add(changeRow);
+        }
+
+        while (rs.next()) {
+            Map<String, Object> pks = new HashMap<>();
+            for (Map.Entry<String, Object> pkCol: 
changes.get(0).pks.entrySet()) {
+                pks.put(pkCol.getKey(), rs.getObject(pkCol.getKey()));
+            }
+            ChangeRow changeRow = changeMap.get(pks).remove(0);
+            String changeDesc = "Change: " + changeRow;
             for (Map.Entry<String, Object> pkCol: changeRow.pks.entrySet()) {
-                assertEquals(changeDesc, pkCol.getValue(), 
rs.getObject(pkCol.getKey()));
+                if (!pkCol.getValue().equals(rs.getObject(pkCol.getKey()))) {
+                    assertEquals(changeDesc, pkCol.getValue(), 
rs.getObject(pkCol.getKey()));
+                }
             }
             Map<String, Object> cdcObj = 
mapper.reader(HashMap.class).readValue(
                     rs.getString(changeRow.pks.size()+2));
-            assertEquals(changeDesc, changeRow.getChangeType(), 
cdcObj.get(CDC_EVENT_TYPE));
+            if (!changeRow.getChangeType().equals(cdcObj.get(CDC_EVENT_TYPE))) 
{
+                assertEquals(changeDesc, changeRow.getChangeType(), 
cdcObj.get(CDC_EVENT_TYPE));
+            }
             if (cdcObj.containsKey(CDC_PRE_IMAGE)
                     && ! ((Map) cdcObj.get(CDC_PRE_IMAGE)).isEmpty()
                     && changeScopes.contains(PTable.CDCChangeScope.PRE)) {
@@ -683,7 +701,11 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
                 assertEquals(changeDesc, postImage, fillInNulls(
                         (Map<String, Object>) cdcObj.get(CDC_POST_IMAGE), 
dataColumns.keySet()));
             }
-            ++changenr;
+        }
+
+        // Make sure that all the expected changes are returned by CDC
+        for (List<ChangeRow> rowVersionList : changeMap.values()) {
+            assertTrue(rowVersionList.isEmpty());
         }
     }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
index 21dae3bf70..274314f106 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
@@ -87,7 +87,7 @@ public class CDCDefinitionIT extends CDCBaseIT {
         }
 
         cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
-        createCDC(conn, cdc_sql, null, null);
+        createCDC(conn, cdc_sql, null);
         assertCDCState(conn, cdcName, null, 3);
         assertNoResults(conn, cdcName);
 
@@ -113,51 +113,6 @@ public class CDCDefinitionIT extends CDCBaseIT {
         conn.close();
     }
 
-    @Test
-    public void testCreateWithSalt() throws Exception {
-        // Indexes on views don't support salt buckets and is currently 
silently ignored.
-        if (forView) {
-            return;
-        }
-
-        // {data table bucket count, CDC bucket count}
-        Integer[][] saltingConfigs = new Integer[][] {
-                new Integer[]{null, 2},
-                new Integer[]{0, 2},
-                new Integer[]{4, null},
-                new Integer[]{4, 1},
-                new Integer[]{4, 0},
-                new Integer[]{4, 2}
-        };
-
-        for (Integer[] saltingConfig: saltingConfigs) {
-            try (Connection conn = newConnection()) {
-                String tableName = generateUniqueName();
-                createTable(conn, "CREATE TABLE  " + tableName +
-                                " ( k INTEGER PRIMARY KEY, v1 INTEGER, v2 
DATE)",
-                                null, false, saltingConfig[0], false, null);
-                assertSaltBuckets(conn, tableName, saltingConfig[0]);
-
-                String cdcName = generateUniqueName();
-                String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
-                createCDC(conn, cdc_sql, null,
-                        saltingConfig[1]);
-                try {
-                    assertCDCState(conn, cdcName, null, 3);
-                    assertSaltBuckets(conn, cdcName, null);
-                    // Index inherits table salt buckets.
-                    assertSaltBuckets(conn, CDCUtil.getCDCIndexName(cdcName),
-                            saltingConfig[1] != null ? saltingConfig[1] : 
saltingConfig[0]);
-                    assertNoResults(conn, cdcName);
-                } catch (Exception error) {
-                    throw new AssertionError("{tableSaltBuckets=" + 
saltingConfig[0] + ", " +
-                            "cdcSaltBuckets=" + saltingConfig[1] + "} " + 
error.getMessage(),
-                            error);
-                }
-            }
-        }
-    }
-
     @Test
     public void testCreateWithSchemaName() throws Exception {
         Properties props = new Properties();
@@ -206,8 +161,9 @@ public class CDCDefinitionIT extends CDCBaseIT {
         assertEquals(true, indexTable.isMultiTenant());
         List<PColumn> idxPkColumns = indexTable.getPKColumns();
         assertEquals(":TENANTID", idxPkColumns.get(0).getName().getString());
-        assertEquals(": PHOENIX_ROW_TIMESTAMP()", 
idxPkColumns.get(1).getName().getString());
-        assertEquals(":K", idxPkColumns.get(2).getName().getString());
+        assertEquals(": PARTITION_ID()", 
idxPkColumns.get(1).getName().getString());
+        assertEquals(": PHOENIX_ROW_TIMESTAMP()", 
idxPkColumns.get(2).getName().getString());
+        assertEquals(":K", idxPkColumns.get(3).getName().getString());
 
         PTable cdcTable = PhoenixRuntime.getTable(conn, cdcName);
         assertEquals(true, cdcTable.isMultiTenant());
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
index 317c2af895..c7cd8a7dc4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -18,8 +18,15 @@
 package org.apache.phoenix.end2end;
 
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.filter.DistinctPrefixFilter;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.RowKeyOrderedAggregateResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
@@ -41,11 +48,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Calendar;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -86,31 +93,29 @@ public class CDCQueryIT extends CDCBaseIT {
     private final boolean forView;
     private final PTable.QualifierEncodingScheme encodingScheme;
     private final boolean multitenant;
-    private final Integer indexSaltBuckets;
     private final Integer tableSaltBuckets;
     private final boolean withSchemaName;
 
     public CDCQueryIT(Boolean forView,
                       PTable.QualifierEncodingScheme encodingScheme, boolean 
multitenant,
-                      Integer indexSaltBuckets, Integer tableSaltBuckets, 
boolean withSchemaName) {
+                      Integer tableSaltBuckets, boolean withSchemaName) {
         this.forView = forView;
         this.encodingScheme = encodingScheme;
         this.multitenant = multitenant;
-        this.indexSaltBuckets = indexSaltBuckets;
         this.tableSaltBuckets = tableSaltBuckets;
         this.withSchemaName = withSchemaName;
     }
 
     @Parameterized.Parameters(name = "forView={0}, encodingScheme={1}, " +
-            "multitenant={2}, indexSaltBuckets={3}, tableSaltBuckets={4} 
withSchemaName={5}")
+            "multitenant={2}, tableSaltBuckets={3}, withSchemaName={4}")
     public static synchronized Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                { Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, 
null, Boolean.FALSE },
-                { Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, 
null, Boolean.TRUE },
-                { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, null, 
4, Boolean.FALSE },
-                { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 1, 2, 
Boolean.TRUE },
-                { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 4, 
null, Boolean.FALSE },
-                { Boolean.TRUE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, 
null, Boolean.FALSE },
+                { Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, 
Boolean.FALSE },
+                { Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, 
Boolean.TRUE },
+                { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 4, 
Boolean.FALSE },
+                { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 2, 
Boolean.TRUE },
+                { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, null, 
Boolean.FALSE },
+                { Boolean.TRUE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, 
Boolean.FALSE },
         });
     }
 
@@ -135,6 +140,117 @@ public class CDCQueryIT extends CDCBaseIT {
         String explainPlan = QueryUtil.getExplainPlan(rs);
         assertFalse(explainPlan.contains(cdcName));
     }
+
+    private boolean isDistinctPrefixFilterIncludedInFilterList(FilterList 
filterList) {
+        for (Filter filter : filterList.getFilters()) {
+            if (filter instanceof DistinctPrefixFilter) {
+                return true;
+            } else if (filter instanceof FilterList) {
+                return isDistinctPrefixFilterIncludedInFilterList((FilterList) 
filter);
+            }
+        }
+        return false;
+    }
+    private boolean isDistinctPrefixFilterIncluded(Scan scan) {
+        Filter filter = scan.getFilter();
+        if (filter != null && filter instanceof DistinctPrefixFilter) {
+            return true;
+        } else if (filter instanceof FilterList) {
+                return isDistinctPrefixFilterIncludedInFilterList((FilterList) 
filter);
+        }
+        return false;
+    }
+
+    private void checkIndexPartitionIdCount(Connection conn, String cdcName) 
throws Exception {
+        // Verify that we can use retrieve partition ids
+        ResultSet rs = conn.createStatement().executeQuery("SELECT 
PARTITION_ID() FROM "
+                + cdcName + " ORDER BY PARTITION_ID()");
+        int saltBuckets = tableSaltBuckets == null ? 1 : tableSaltBuckets;
+        String[] partitionId = new String[saltBuckets];
+        int[] countPerPartition = new int[saltBuckets];
+        int partitionIndex = 0;
+        assertTrue(rs.next());
+        partitionId[partitionIndex] = rs.getString(1);
+        countPerPartition[partitionIndex]++;
+        LOGGER.info("PARTITION_ID["+ partitionIndex + "] = " + 
partitionId[partitionIndex]);
+        while (rs.next()) {
+            if (!partitionId[partitionIndex].equals(rs.getString(1))) {
+                partitionIndex++;
+                partitionId[partitionIndex] = rs.getString(1);
+                LOGGER.info("PARTITION_ID["+ partitionIndex + "] = " + 
partitionId[partitionIndex]);
+            }
+            countPerPartition[partitionIndex]++;
+        }
+        // Verify that the number of partitions equals to the number of table 
regions. In this case,
+        // it equals to the number of salt buckets
+        assertEquals(saltBuckets, partitionIndex + 1);
+
+        rs = conn.createStatement().executeQuery("SELECT DISTINCT 
PARTITION_ID() FROM "
+                + cdcName);
+        assertTrue(rs.next());
+        partitionIndex = 0;
+        partitionId[partitionIndex] = rs.getString(1);
+        int rowCount = 1;
+        while (rs.next()) {
+            if (!partitionId[partitionIndex].equals(rs.getString(1))) {
+                partitionIndex++;
+                partitionId[partitionIndex] = rs.getString(1);
+                LOGGER.info("PARTITION_ID["+ partitionIndex + "] = " + 
partitionId[partitionIndex]);
+            }
+            rowCount++;
+        }
+        // Verify that the number of partitions equals to the number of table 
regions. In this case,
+        // it equals to the number of salt buckets
+        assertEquals(saltBuckets, partitionIndex + 1);
+        // Verified that we only got distinct partition ids
+        assertEquals(saltBuckets, rowCount);
+        //Verify that DistinctPrefixFilter is used to efficiently retrieve 
partition ids
+        assertTrue(isDistinctPrefixFilterIncluded(((PhoenixResultSet) 
rs).getContext().getScan()));
+
+        // Verify that we can access data table mutations by partition id
+        PreparedStatement statement = conn.prepareStatement(
+                getCDCQuery(cdcName, saltBuckets, partitionId));
+        statement.setTimestamp(1, new Timestamp(1000));
+        statement.setTimestamp(2,  new Timestamp(System.currentTimeMillis()));
+        rs = statement.executeQuery();
+        rowCount = 0;
+        while(rs.next()) {
+            rowCount++;
+            String id = rs.getString(1);
+            int count = rs.getInt(2);
+            boolean found = false;
+            for (int i = 0; i < saltBuckets; i++) {
+                if (partitionId[i].equals(id) && count == 
countPerPartition[i]) {
+                    found = true;
+                    break;
+                }
+            }
+            assertTrue(found);
+        }
+        // Verify that partition id based queries are row key prefix queries
+        ResultIterator resultIterator = ((PhoenixResultSet) 
rs).getUnderlyingIterator();
+        assertTrue(resultIterator instanceof 
RowKeyOrderedAggregateResultIterator);
+        assertEquals(saltBuckets, rowCount);
+    }
+
+    private static String getCDCQuery(String cdcName, int saltBuckets,
+            String[] partitionId) {
+        StringBuilder query = new StringBuilder("SELECT PARTITION_ID(), 
Count(*) from ");
+        query.append(cdcName);
+        query.append(" WHERE PARTITION_ID() IN (");
+        for (int i = 0; i < saltBuckets - 1; i++) {
+            query.append("'");
+            query.append(partitionId[i]);
+            query.append("',");
+        }
+        query.append("'");
+        query.append(partitionId[saltBuckets - 1]);
+        query.append("')");
+        query.append(" AND PHOENIX_ROW_TIMESTAMP() >= ? AND 
PHOENIX_ROW_TIMESTAMP() < ?");
+        query.append(" Group By PARTITION_ID()");
+        return query.toString();
+    }
+
     @Test
     public void testSelectCDC() throws Exception {
         String cdcName, cdc_sql;
@@ -155,8 +271,7 @@ public class CDCQueryIT extends CDCBaseIT {
             }
             cdcName = generateUniqueName();
             cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
-            createCDC(conn, cdc_sql, encodingScheme,
-                        indexSaltBuckets);
+            createCDC(conn, cdc_sql, encodingScheme);
         }
 
         String tenantId = multitenant ? "1000" : null;
@@ -177,16 +292,28 @@ public class CDCQueryIT extends CDCBaseIT {
                     "SELECT /*+ CDC_INCLUDE(PRE, POST) */ 
PHOENIX_ROW_TIMESTAMP(), K," +
                             "\"CDC JSON\" FROM " + cdcFullName);
 
-            // Existence of CDC shouldn't cause the regular query path to fail.
+            // Existence of an CDC index hint shouldn't cause the regular 
query path to fail.
+            // Run the same query with a CDC index hit and without it and make 
sure we get the same
+            // result from both
             String uncovered_sql = "SELECT " + " /*+ INDEX(" + tableName + " " 
+
                     CDCUtil.getCDCIndexName(cdcName) + ") */ k, v1 FROM " + 
tableName;
             try (ResultSet rs = 
conn.createStatement().executeQuery(uncovered_sql)) {
+                assertTrue(rs.next());
+                assertEquals(2, rs.getInt(1));
+                assertEquals(201, rs.getInt(2));
                 assertTrue(rs.next());
                 assertEquals(3, rs.getInt(1));
                 assertEquals(300, rs.getInt(2));
+                assertFalse(rs.next());
+            }
+            uncovered_sql = "SELECT " + "  k, v1 FROM " + tableName;
+            try (ResultSet rs = 
conn.createStatement().executeQuery(uncovered_sql)) {
                 assertTrue(rs.next());
                 assertEquals(2, rs.getInt(1));
                 assertEquals(201, rs.getInt(2));
+                assertTrue(rs.next());
+                assertEquals(3, rs.getInt(1));
+                assertEquals(300, rs.getInt(2));
                 assertFalse(rs.next());
             }
 
@@ -210,19 +337,18 @@ public class CDCQueryIT extends CDCBaseIT {
                     datatableName, dataColumns, changes, new HashSet<>());
 
             HashMap<String, int[]> testQueries = new HashMap<String, int[]>() 
{{
-                put("SELECT 'dummy', k, \"CDC JSON\" FROM " + cdcFullName,
+                put("SELECT 'dummy', k, \"CDC JSON\" FROM " + cdcFullName
+                        + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC, K ASC",
                         new int[]{1, 2, 3, 1, 1, 1, 1, 2, 1, 1, 1, 1});
-                put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " + 
cdcFullName +
-                        " ORDER BY k ASC", new int[]{1, 1, 1, 1, 1, 1, 1, 1, 
1, 2, 2, 3});
-                put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " + 
cdcFullName +
-                        " ORDER BY k DESC", new int[]{3, 2, 2, 1, 1, 1, 1, 1, 
1, 1, 1, 1});
-                put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " + 
cdcFullName +
-                        " ORDER BY PHOENIX_ROW_TIMESTAMP() DESC",
-                        new int[]{1, 1, 1, 1, 2, 1, 1, 1, 1, 3, 2, 1});
+                put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " + 
cdcFullName
+                        + " ORDER BY k ASC", new int[]{1, 1, 1, 1, 1, 1, 1, 1, 
1, 2, 2, 3});
+                put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " + 
cdcFullName
+                        + " ORDER BY k DESC", new int[]{3, 2, 2, 1, 1, 1, 1, 
1, 1, 1, 1, 1});
             }};
             Map<String, String> dummyChange = new HashMap() {{
                 put(CDC_EVENT_TYPE, "dummy");
             }};
+
             for (Map.Entry<String, int[]> testQuery : testQueries.entrySet()) {
                 try (ResultSet rs = 
conn.createStatement().executeQuery(testQuery.getKey())) {
                     for (int i = 0; i < testQuery.getValue().length; ++i) {
@@ -278,7 +404,7 @@ public class CDCQueryIT extends CDCBaseIT {
             }
             cdcName = generateUniqueName();
             cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE 
(change)";
-            createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+            createCDC(conn, cdc_sql, encodingScheme);
         }
 
         String tenantId = multitenant ? "1000" : null;
@@ -290,7 +416,7 @@ public class CDCQueryIT extends CDCBaseIT {
         long startTS = System.currentTimeMillis();
         Map<String, List<Set<ChangeRow>>> allBatches = new 
HashMap<>(tenantids.length);
         for (String tid: tenantids) {
-            allBatches.put(tid, generateMutations(startTS, pkColumns, 
dataColumns, 20, 5));
+            allBatches.put(tid, generateMutations(tenantId, startTS, 
pkColumns, dataColumns, 20, 5));
             applyMutations(COMMIT_SUCCESS, schemaName, tableName, 
datatableName, tid,
                     allBatches.get(tid), cdcName);
         }
@@ -318,6 +444,7 @@ public class CDCQueryIT extends CDCBaseIT {
                             "SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ * 
FROM " + cdcFullName),
                     datatableName, dataColumns, changes, ALL_IMG);
             cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, 
cdcName);
+            checkIndexPartitionIdCount(conn, cdcFullName);
         }
     }
 
@@ -342,7 +469,7 @@ public class CDCQueryIT extends CDCBaseIT {
             cdcName = generateUniqueName();
             cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
 
-            createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+            createCDC(conn, cdc_sql, encodingScheme);
         }
 
         String tenantId = multitenant ? "1000" : null;
@@ -368,13 +495,16 @@ public class CDCQueryIT extends CDCBaseIT {
                     "SELECT /*+ CDC_INCLUDE(PRE, POST) */ 
PHOENIX_ROW_TIMESTAMP(), K," +
                             "\"CDC JSON\" FROM " + cdcFullName);
             verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
-                            "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + 
cdcFullName),
+                    "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + 
cdcFullName
+                            + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC"),
                     datatableName, dataColumns, changes, PRE_POST_IMG);
             verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
-                            "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + 
cdcFullName),
+                    "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + cdcFullName
+                            + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC"),
                     datatableName, dataColumns, changes, CHANGE_IMG);
             verifyChangesViaSCN(tenantId, 
conn.createStatement().executeQuery("SELECT /*+ CDC_INCLUDE(CHANGE) */ " +
-                            "PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + 
cdcFullName),
+                            "PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + 
cdcFullName
+                            + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC"),
                     datatableName, dataColumns, changes, CHANGE_IMG);
             cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, 
cdcName);
         }
@@ -413,7 +543,7 @@ public class CDCQueryIT extends CDCBaseIT {
             }
             cdcName = generateUniqueName();
             cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE 
(change)";
-            createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+            createCDC(conn, cdc_sql, encodingScheme);
             cdcIndexShouldNotBeUsedForDataTableQueries(conn, 
tableName,cdcName);
         }
 
@@ -426,7 +556,7 @@ public class CDCQueryIT extends CDCBaseIT {
         long startTS = System.currentTimeMillis();
         Map<String, List<Set<ChangeRow>>> allBatches = new 
HashMap<>(tenantids.length);
         for (String tid: tenantids) {
-            allBatches.put(tid, generateMutations(startTS, pkColumns, 
dataColumns, 20, 5));
+            allBatches.put(tid, generateMutations(tenantId, startTS, 
pkColumns, dataColumns, 20, 5));
             applyMutations(COMMIT_SUCCESS, schemaName, tableName, 
datatableName, tid,
                     allBatches.get(tid), cdcName);
         }
@@ -489,7 +619,7 @@ public class CDCQueryIT extends CDCBaseIT {
 
             cdcName = generateUniqueName();
             cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
-            createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+            createCDC(conn, cdc_sql, encodingScheme);
             conn.createStatement().execute("ALTER TABLE " + datatableName + " 
DROP COLUMN v0");
         }
 
@@ -514,7 +644,7 @@ public class CDCQueryIT extends CDCBaseIT {
         try (Connection conn = newConnection(tenantId)) {
             verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
                             "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + 
SchemaUtil.getTableName(
-                                    schemaName, cdcName)),
+                                    schemaName, cdcName) + " ORDER BY 
PHOENIX_ROW_TIMESTAMP() ASC"),
                     datatableName, dataColumns, changes, CHANGE_IMG);
             cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, 
cdcName);
         }
@@ -540,7 +670,7 @@ public class CDCQueryIT extends CDCBaseIT {
             }
             cdcName = generateUniqueName();
             cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
-            createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+            createCDC(conn, cdc_sql, encodingScheme);
             cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, 
cdcName);
         }
 
@@ -603,7 +733,7 @@ public class CDCQueryIT extends CDCBaseIT {
             // Create a CDC table
             cdcName = generateUniqueName();
             cdc_sql = "CREATE CDC " + cdcName + " ON " + tableFullName;
-            createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+            createCDC(conn, cdc_sql, encodingScheme);
             // Check CDC index is active but empty
             String indexTableFullName = SchemaUtil.getTableName(schemaName,
                     CDCUtil.getCDCIndexName(cdcName));
@@ -677,7 +807,7 @@ public class CDCQueryIT extends CDCBaseIT {
             // Create a CDC table
             cdcName = generateUniqueName();
             cdc_sql = "CREATE CDC " + cdcName + " ON " + tableFullName;
-            createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+            createCDC(conn, cdc_sql, encodingScheme);
             // Add rows
             long startTS = System.currentTimeMillis();
             List<ChangeRow> changes = generateChanges(startTS, tenantids, 
tableFullName,
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java
index dce37d3356..35e6213293 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java
@@ -134,9 +134,8 @@ public class PrepareIndexMutationsForRebuildTest extends 
BaseConnectionlessQuery
                 Bytes.toBytes("v2"));
         addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
 
-        List<Mutation> actualIndexMutations = 
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
-                dataPut,
-                null);
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+                .prepareIndexMutationsForRebuild(info.indexMaintainer, 
dataPut, null, null);
 
         // Expect one row of index with row key "v1_k1"
         Put idxPut1 = new Put(generateIndexRowKey("v1"));
@@ -166,9 +165,8 @@ public class PrepareIndexMutationsForRebuildTest extends 
BaseConnectionlessQuery
                 Bytes.toBytes("v2"));
         addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
 
-        List<Mutation> actualIndexMutations = 
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
-                dataPut,
-                null);
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+                .prepareIndexMutationsForRebuild(info.indexMaintainer, 
dataPut, null, null);
 
         // Expect one row of index with row key "_k1", as indexed column C1 is 
nullable.
         Put idxPut1 = new Put(generateIndexRowKey(null));
@@ -212,9 +210,8 @@ public class PrepareIndexMutationsForRebuildTest extends 
BaseConnectionlessQuery
                 2,
                 Cell.Type.DeleteColumn);
 
-        List<Mutation> actualIndexMutations = 
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
-                dataPut,
-                dataDel);
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+                .prepareIndexMutationsForRebuild(info.indexMaintainer, 
dataPut, dataDel, null);
 
         List<Mutation> expectedIndexMutation = new ArrayList<>();
 
@@ -278,9 +275,8 @@ public class PrepareIndexMutationsForRebuildTest extends 
BaseConnectionlessQuery
                 2,
                 Cell.Type.DeleteColumn);
 
-        List<Mutation> actualIndexMutations = 
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
-                dataPut,
-                dataDel);
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+                .prepareIndexMutationsForRebuild(info.indexMaintainer, 
dataPut, dataDel, null);
 
         List<Mutation> expectedIndexMutations = new ArrayList<>();
 
@@ -338,9 +334,8 @@ public class PrepareIndexMutationsForRebuildTest extends 
BaseConnectionlessQuery
                 3,
                 Cell.Type.DeleteFamily);
 
-        List<Mutation> actualIndexMutations = 
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
-                dataPut,
-                dataDel);
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+                .prepareIndexMutationsForRebuild(info.indexMaintainer, 
dataPut, dataDel, null);
 
         List<Mutation> expectedIndexMutations = new ArrayList<>();
 
@@ -406,9 +401,8 @@ public class PrepareIndexMutationsForRebuildTest extends 
BaseConnectionlessQuery
                 1,
                 Cell.Type.DeleteColumn);
 
-        List<Mutation> actualIndexMutations = 
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
-                dataPut,
-                dataDel);
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+                .prepareIndexMutationsForRebuild(info.indexMaintainer, 
dataPut, dataDel, null);
 
         List<Mutation> expectedIndexMutations = new ArrayList<>();
 
@@ -458,10 +452,8 @@ public class PrepareIndexMutationsForRebuildTest extends 
BaseConnectionlessQuery
                 2,
                 Cell.Type.DeleteFamily);
 
-        List<Mutation> actualIndexMutations = 
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(
-                info.indexMaintainer,
-                dataPut,
-                dataDel);
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+                .prepareIndexMutationsForRebuild(info.indexMaintainer, 
dataPut, dataDel, null);
 
         List<Mutation> expectedIndexMutations = new ArrayList<>();
         byte[] idxKeyBytes = generateIndexRowKey("v2");
@@ -522,10 +514,8 @@ public class PrepareIndexMutationsForRebuildTest extends 
BaseConnectionlessQuery
                 2,
                 Cell.Type.DeleteFamily);
 
-        List<Mutation> actualIndexMutations = 
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(
-                info.indexMaintainer,
-                dataPut,
-                dataDel);
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+                .prepareIndexMutationsForRebuild(info.indexMaintainer, 
dataPut, dataDel, null);
 
         List<Mutation> expectedIndexMutations = new ArrayList<>();
         byte[] idxKeyBytes = generateIndexRowKey("v2");
@@ -579,9 +569,8 @@ public class PrepareIndexMutationsForRebuildTest extends 
BaseConnectionlessQuery
                 2,
                 Cell.Type.DeleteColumn);
 
-        List<Mutation> actualIndexMutations = 
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
-                dataPut,
-                dataDel);
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+                .prepareIndexMutationsForRebuild(info.indexMaintainer, 
dataPut, dataDel, null);
 
         List<Mutation> expectedIndexMutations = new ArrayList<>();
         byte[] idxKeyBytes = generateIndexRowKey("v1");
@@ -655,9 +644,8 @@ public class PrepareIndexMutationsForRebuildTest extends 
BaseConnectionlessQuery
                 2,
                 Cell.Type.DeleteColumn);
 
-        List<Mutation> actualIndexMutations = 
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
-                dataPut,
-                dataDel);
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+                .prepareIndexMutationsForRebuild(info.indexMaintainer, 
dataPut, dataDel, null);
 
         List<Mutation> expectedIndexMutation = new ArrayList<>();
 
@@ -720,9 +708,8 @@ public class PrepareIndexMutationsForRebuildTest extends 
BaseConnectionlessQuery
                 Bytes.toBytes("v3"));
         addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 2);
 
-        List<Mutation> actualIndexMutations = 
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
-                dataPut,
-                null);
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+                .prepareIndexMutationsForRebuild(info.indexMaintainer, 
dataPut, null, null);
 
         byte[] idxKeyBytes = generateIndexRowKey(null);
 
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
index 2cdfc1a8f8..5873766221 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
@@ -284,7 +284,7 @@ public class VerifySingleIndexRowTest extends 
BaseConnectionlessQueryTest {
 
     private void initializeGlobalMockitoSetup() throws IOException {
         //setup
-        when(indexMaintainer.getIndexRowKey(put)).thenCallRealMethod();
+        when(indexMaintainer.getIndexRowKey(put, null)).thenCallRealMethod();
         when(rebuildScanner.prepareIndexMutations(put, delete, 
indexKeyToMutationMap, mostRecentIndexRowKeys)).thenCallRealMethod();
         
when(rebuildScanner.verifySingleIndexRow(ArgumentMatchers.<byte[]>any(), 
ArgumentMatchers.<List>any(),ArgumentMatchers.<List>any(), 
ArgumentMatchers.<Set>any(), ArgumentMatchers.<List>any(),
                 
ArgumentMatchers.<IndexToolVerificationResult.PhaseResult>any(), 
ArgumentMatchers.anyBoolean())).thenCallRealMethod();
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/parse/PartitionIdFunctionTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/parse/PartitionIdFunctionTest.java
new file mode 100644
index 0000000000..9ab88211c9
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/parse/PartitionIdFunctionTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class PartitionIdFunctionTest {
+
+    @Test
+    public void testExpressionWithPartitionId() throws Exception {
+        ParseNode parseNode = SQLParser.parseCondition("(PARTITION_ID() = 
PK2)");
+        boolean hasPartitionIdParseNode = false;
+        for (ParseNode childNode : parseNode.getChildren()) {
+            if 
(childNode.getClass().isAssignableFrom(PartitionIdParseNode.class)) {
+                assertEquals(0, childNode.getChildren().size());
+                hasPartitionIdParseNode = true;
+            }
+        }
+        assertTrue(hasPartitionIdParseNode);
+    }
+
+    @Test
+    public void testExpressionWithPhoenixRowTimestampWithParams() throws 
Exception {
+        ParseNode parseNode = SQLParser.parseCondition("(PARTITION_ID(COL1) = 
PK2)");
+        for (ParseNode childNode : parseNode.getChildren()) {
+            assertFalse("PartitionIdFunction does not take any parameters",
+                    
childNode.getClass().isAssignableFrom(PartitionIdParseNode.class));
+        }
+    }
+
+    @Test
+    public void testSelectWithPhoenixRowTimestamp() throws Exception {
+        SQLParser parser = new SQLParser("SELECT PARTITION_ID() FROM xyz");
+        List<AliasedNode> nodes = parser.parseQuery().getSelect();
+        assertEquals(1, nodes.size());
+        assertTrue("PARTITION_ID() should parse to PartitionIdParseNode",
+                nodes.get(0).getNode().getClass()
+                        .isAssignableFrom(PartitionIdParseNode.class));
+        assertEquals(0, nodes.get(0).getNode().getChildren().size());
+    }
+
+}

Reply via email to