This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 4283b7c819 PHOENIX-7469 Partitioned CDC Index for partitioned tables 
(#2029)
4283b7c819 is described below

commit 4283b7c819fdca8af31b89b1aa0913bef543fe5e
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Mon Nov 25 10:51:28 2024 -0800

    PHOENIX-7469 Partitioned CDC Index for partitioned tables (#2029)
---
 .../expression/function/PartitionIdFunction.java   | 25 ++----
 .../org/apache/phoenix/index/IndexMaintainer.java  | 11 ++-
 .../apache/phoenix/parse/PartitionIdParseNode.java | 21 ++++-
 .../coprocessor/CDCGlobalIndexRegionScanner.java   | 10 ++-
 .../org/apache/phoenix/end2end/CDCQueryIT.java     | 97 ++++++++++++++++------
 5 files changed, 119 insertions(+), 45 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/PartitionIdFunction.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/PartitionIdFunction.java
index 25a2c4d0ce..382cea042e 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/PartitionIdFunction.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/PartitionIdFunction.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.expression.function;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
 import org.apache.phoenix.parse.PartitionIdParseNode;
 import org.apache.phoenix.schema.tuple.Tuple;
@@ -29,8 +30,8 @@ import org.apache.phoenix.schema.types.PDataType;
 import java.util.List;
 
 /**
- * Function to return the partition id which is the encoded data table region 
name as the prefix
- * of the CDC index row key. This function is used only with CDC Indexes
+ * Function to return the partition id which is the encoded data table region 
name.
+ * This function is used only with CDC Indexes
  */
 @BuiltInFunction(name = PartitionIdFunction.NAME,
         nodeClass= PartitionIdParseNode.class,
@@ -49,11 +50,6 @@ public class PartitionIdFunction extends ScalarFunction {
      */
     public PartitionIdFunction(List<Expression> children) {
         super(children);
-        if (!children.isEmpty()) {
-            throw new IllegalArgumentException(
-                    "PartitionIdFunction should not have any child expression"
-            );
-        }
     }
 
     @Override
@@ -62,24 +58,20 @@ public class PartitionIdFunction extends ScalarFunction {
     }
 
     /**
-     * The evaluate method is called under the following conditions -
-     * 1. When PARTITION_ID() is evaluated in the projection list.
-     *
-     * 2. When PARTITION_ID() is evaluated in the backend as part of the where 
clause.
-     *
+     * Since partition id is part of the row key, this method is not called 
when PARTITION_ID()
+     * is used with an IN clause.
      */
     @Override
     public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
         if (tuple == null) {
             return false;
         }
-        tuple.getKey(ptr);
         if (ptr.getLength() < PARTITION_ID_LENGTH) {
             return false;
         }
-        // The partition id of a row is always the prefix of the row key
-        ptr.set(ptr.get(), 0, PARTITION_ID_LENGTH);
-        return true;
+        RowKeyColumnExpression partitionIdColumnExpression =
+                (RowKeyColumnExpression) children.get(0);
+        return partitionIdColumnExpression.evaluate(tuple, ptr);
     }
 
     @Override
@@ -101,5 +93,4 @@ public class PartitionIdFunction extends ScalarFunction {
     public Determinism getDeterminism() {
         return Determinism.PER_ROW;
     }
-
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index c9fde90ca6..24645b29a9 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -1246,7 +1246,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         return num;
     }
 
-    private RowKeySchema getIndexRowKeySchema() {
+    public RowKeySchema getIndexRowKeySchema() {
         if (indexRowKeySchema != null) {
             return indexRowKeySchema;
         }
@@ -2382,9 +2382,14 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     public boolean isUncovered() {
         return isUncovered;
     }
+
     public boolean isCDCIndex() {
         return isCDCIndex;
     }
+
+    public boolean isMultiTenant() {
+        return isMultiTenant;
+    }
     
     public boolean isImmutableRows() {
         return immutableRows;
@@ -2443,4 +2448,8 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     public QualifierEncodingScheme getDataEncodingScheme() {
         return dataEncodingScheme;
     }
+
+    public byte[] getViewIndexId() {
+        return viewIndexId;
+    }
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/PartitionIdParseNode.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/PartitionIdParseNode.java
index 5a5908667f..d0a74fb546 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/PartitionIdParseNode.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/PartitionIdParseNode.java
@@ -20,11 +20,14 @@ package org.apache.phoenix.parse;
 
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.expression.function.FunctionExpression;
 import org.apache.phoenix.expression.function.PartitionIdFunction;
-
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.RowKeyValueAccessor;
 
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.List;
 
 public class PartitionIdParseNode extends FunctionParseNode {
@@ -43,6 +46,20 @@ public class PartitionIdParseNode extends FunctionParseNode {
                     "PartitionIdFunction does not take any parameters"
             );
         }
-        return new PartitionIdFunction(children);
+        PTable table = context.getCurrentTable().getTable();
+        if (table.getViewIndexId()!= null && table.isMultiTenant()) {
+            return new PartitionIdFunction(getExpressions(table, 2));
+        } else if (table.getViewIndexId()!= null || table.isMultiTenant()) {
+            return new PartitionIdFunction(getExpressions(table, 1));
+        } else {
+            return new PartitionIdFunction(getExpressions(table, 0));
+        }
+    }
+
+    private static List<Expression> getExpressions(PTable table, int position) 
{
+        List<Expression> expressionList = new ArrayList<>(1);
+        expressionList.add(new 
RowKeyColumnExpression(table.getPKColumns().get(position),
+                new RowKeyValueAccessor(table.getPKColumns(), position)));
+        return expressionList;
     }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index f0c20f1ccf..4bde0727e9 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -35,8 +35,10 @@ import 
org.apache.phoenix.expression.SingleCellColumnExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.CDCTableInfo;
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.CDCChangeBuilder;
 import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -104,7 +106,13 @@ public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScann
             ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
                     indexToDataRowKeyMap.get(indexRowKey));
             Result dataRow = dataRows.get(dataRowKey);
-            Long changeTS = firstIndexCell.getTimestamp();
+            int phoenixRowTimestampFunctionOffset = 2 + 
(indexMaintainer.isMultiTenant() ? 1 : 0)
+                    + (indexMaintainer.getViewIndexId() != null ? 1 : 0);
+            ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+            
indexMaintainer.getIndexRowKeySchema().iterator(firstIndexCell.getRowArray(),
+                    firstIndexCell.getRowOffset(), 
firstIndexCell.getRowLength(), ptr,
+                    phoenixRowTimestampFunctionOffset);
+            long changeTS = PLong.INSTANCE.getCodec().decodeLong(ptr, 
SortOrder.ASC);
             TupleProjector dataTableProjector = 
cdcDataTableInfo.getDataTableProjector();
             Expression[] expressions = dataTableProjector != null ?
                     dataTableProjector.getExpressions() : null;
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
index c7cd8a7dc4..094815638c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -209,7 +210,7 @@ public class CDCQueryIT extends CDCBaseIT {
 
         // Verify that we can access data table mutations by partition id
         PreparedStatement statement = conn.prepareStatement(
-                getCDCQuery(cdcName, saltBuckets, partitionId));
+                getCDCQuery(cdcName, partitionId));
         statement.setTimestamp(1, new Timestamp(1000));
         statement.setTimestamp(2,  new Timestamp(System.currentTimeMillis()));
         rs = statement.executeQuery();
@@ -233,24 +234,59 @@ public class CDCQueryIT extends CDCBaseIT {
         assertEquals(saltBuckets, rowCount);
     }
 
-    private static String getCDCQuery(String cdcName, int saltBuckets,
-            String[] partitionId) {
+    private static String getCDCQuery(String cdcName, String[] partitionId) {
         StringBuilder query = new StringBuilder("SELECT PARTITION_ID(), 
Count(*) from ");
         query.append(cdcName);
         query.append(" WHERE PARTITION_ID() IN (");
-        for (int i = 0; i < saltBuckets - 1; i++) {
+        for (int i = 0; i < partitionId.length - 1; i++) {
             query.append("'");
             query.append(partitionId[i]);
             query.append("',");
         }
         query.append("'");
-        query.append(partitionId[saltBuckets - 1]);
+        query.append(partitionId[partitionId.length - 1]);
         query.append("')");
         query.append(" AND PHOENIX_ROW_TIMESTAMP() >= ? AND 
PHOENIX_ROW_TIMESTAMP() < ?");
         query.append(" Group By PARTITION_ID()");
         return query.toString();
     }
 
+    private static String addPartitionInList(Connection conn, String cdcName, 
String query)
+            throws SQLException{
+        ResultSet rs = conn.createStatement().executeQuery("SELECT DISTINCT 
PARTITION_ID() FROM "
+                + cdcName);
+        List<String> partitionIds = new ArrayList<>();
+        while (rs.next()) {
+            partitionIds.add(rs.getString(1));
+        }
+        StringBuilder builder = new StringBuilder(query);
+        builder.append(" WHERE PARTITION_ID() IN (");
+        boolean initialized = false;
+        for (String partitionId : partitionIds) {
+            if (!initialized) {
+                builder.append("'");
+                initialized = true;
+            } else {
+                builder.append(",'");
+            }
+            builder.append(partitionId);
+            builder.append("'");
+        }
+        builder.append(")");
+        return builder.toString();
+    }
+
+    private static PreparedStatement getCDCQueryPreparedStatement(Connection 
conn, String cdcName,
+            String query, long minTimestamp, long maxTimestamp)
+            throws SQLException {
+        StringBuilder builder = new StringBuilder(addPartitionInList(conn, 
cdcName, query));
+        builder.append(" AND PHOENIX_ROW_TIMESTAMP() >= ? AND 
PHOENIX_ROW_TIMESTAMP() < ?");
+        PreparedStatement statement = 
conn.prepareStatement(builder.toString());
+        statement.setTimestamp(1, new Timestamp(minTimestamp));
+        statement.setTimestamp(2,  new Timestamp(maxTimestamp));
+        return statement;
+    }
+
     @Test
     public void testSelectCDC() throws Exception {
         String cdcName, cdc_sql;
@@ -283,14 +319,20 @@ public class CDCQueryIT extends CDCBaseIT {
         long startTS = System.currentTimeMillis();
         List<ChangeRow> changes = generateChanges(startTS, tenantids, 
tableName, null,
                 COMMIT_SUCCESS);
+        long currentTime = System.currentTimeMillis();
+        long endTS = changes.get(changes.size() - 1).getTimestamp() + 1;
+        if (endTS > currentTime) {
+            Thread.sleep(endTS - currentTime);
+        }
 
         String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
         try (Connection conn = newConnection(tenantId)) {
             // For debug: uncomment to see the exact results logged to console.
             dumpCDCResults(conn, cdcName,
                     new TreeMap<String, String>() {{ put("K", "INTEGER"); }},
-                    "SELECT /*+ CDC_INCLUDE(PRE, POST) */ 
PHOENIX_ROW_TIMESTAMP(), K," +
-                            "\"CDC JSON\" FROM " + cdcFullName);
+                    addPartitionInList(conn, cdcFullName,
+                            "SELECT /*+ CDC_INCLUDE(PRE, POST) */ 
PHOENIX_ROW_TIMESTAMP(), K,"
+                                    + "\"CDC JSON\" FROM " + cdcFullName));
 
             // Existence of an CDC index hint shouldn't cause the regular 
query path to fail.
             // Run the same query with a CDC index hit and without it and make 
sure we get the same
@@ -322,18 +364,22 @@ public class CDCQueryIT extends CDCBaseIT {
                 put("V2", "INTEGER");
                 put("B.VB", "INTEGER");
             }};
-            verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
-                            "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + 
cdcFullName),
+            verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, 
cdcFullName,
+                            "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + 
cdcFullName, startTS,
+                            endTS).executeQuery(),
                     datatableName, dataColumns, changes, CHANGE_IMG);
-            verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
-                            "SELECT /*+ CDC_INCLUDE(CHANGE) */ 
PHOENIX_ROW_TIMESTAMP(), K," +
-                                    "\"CDC JSON\" FROM " + cdcFullName), 
datatableName, dataColumns,
-                    changes, CHANGE_IMG);
-            verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
-                            "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + 
cdcFullName),
-                    datatableName, dataColumns, changes, PRE_POST_IMG);
-            verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
-                            "SELECT * FROM " + cdcFullName),
+            verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, 
cdcFullName,
+                            "SELECT /*+ CDC_INCLUDE(CHANGE) */ 
PHOENIX_ROW_TIMESTAMP(), K,"
+                                    + "\"CDC JSON\" FROM " + cdcFullName, 
startTS, endTS)
+                            .executeQuery(),
+                    datatableName, dataColumns, changes, CHANGE_IMG);
+            verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, 
cdcFullName,
+                            "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + 
cdcFullName,
+                            startTS, endTS).executeQuery(),
+                    datatableName, dataColumns, changes,
+                    PRE_POST_IMG);
+            verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, 
cdcFullName,
+                    "SELECT * FROM " + cdcFullName,startTS, 
endTS).executeQuery(),
                     datatableName, dataColumns, changes, new HashSet<>());
 
             HashMap<String, int[]> testQueries = new HashMap<String, int[]>() 
{{
@@ -424,24 +470,27 @@ public class CDCQueryIT extends CDCBaseIT {
         String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
         try (Connection conn = newConnection(tenantId)) {
             // For debug: uncomment to see the exact results logged to console.
-            dumpCDCResults(conn, cdcName, pkColumns,
-                    "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + 
cdcFullName);
+            dumpCDCResults(conn, cdcName, pkColumns, addPartitionInList(conn, 
cdcFullName,
+                    "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + 
cdcFullName));
 
             List<ChangeRow> changes = new ArrayList<>();
             for (Set<ChangeRow> batch: allBatches.get(tenantId)) {
                 changes.addAll(batch);
             }
             verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
-                            "SELECT * FROM " + cdcFullName),
+                    addPartitionInList(conn, cdcFullName,"SELECT * FROM " + 
cdcFullName)),
                     datatableName, dataColumns, changes, CHANGE_IMG);
             verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
-                            "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + 
cdcFullName),
+                    addPartitionInList(conn, cdcFullName,
+                            "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + 
cdcFullName)),
                     datatableName, dataColumns, changes, CHANGE_IMG);
             verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
-                            "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + 
cdcFullName),
+                    addPartitionInList(conn, cdcFullName,
+                            "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + 
cdcFullName)),
                     datatableName, dataColumns, changes, PRE_POST_IMG);
             verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
-                            "SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ * 
FROM " + cdcFullName),
+                    addPartitionInList(conn, cdcFullName,
+                            "SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ * 
FROM " + cdcFullName)),
                     datatableName, dataColumns, changes, ALL_IMG);
             cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, 
cdcName);
             checkIndexPartitionIdCount(conn, cdcFullName);

Reply via email to