This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 4283b7c819 PHOENIX-7469 Partitioned CDC Index for partitioned tables
(#2029)
4283b7c819 is described below
commit 4283b7c819fdca8af31b89b1aa0913bef543fe5e
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Mon Nov 25 10:51:28 2024 -0800
PHOENIX-7469 Partitioned CDC Index for partitioned tables (#2029)
---
.../expression/function/PartitionIdFunction.java | 25 ++----
.../org/apache/phoenix/index/IndexMaintainer.java | 11 ++-
.../apache/phoenix/parse/PartitionIdParseNode.java | 21 ++++-
.../coprocessor/CDCGlobalIndexRegionScanner.java | 10 ++-
.../org/apache/phoenix/end2end/CDCQueryIT.java | 97 ++++++++++++++++------
5 files changed, 119 insertions(+), 45 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/PartitionIdFunction.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/PartitionIdFunction.java
index 25a2c4d0ce..382cea042e 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/PartitionIdFunction.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/PartitionIdFunction.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.expression.function;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
import org.apache.phoenix.parse.PartitionIdParseNode;
import org.apache.phoenix.schema.tuple.Tuple;
@@ -29,8 +30,8 @@ import org.apache.phoenix.schema.types.PDataType;
import java.util.List;
/**
- * Function to return the partition id which is the encoded data table region
name as the prefix
- * of the CDC index row key. This function is used only with CDC Indexes
+ * Function to return the partition id which is the encoded data table region
name.
+ * This function is used only with CDC Indexes
*/
@BuiltInFunction(name = PartitionIdFunction.NAME,
nodeClass= PartitionIdParseNode.class,
@@ -49,11 +50,6 @@ public class PartitionIdFunction extends ScalarFunction {
*/
public PartitionIdFunction(List<Expression> children) {
super(children);
- if (!children.isEmpty()) {
- throw new IllegalArgumentException(
- "PartitionIdFunction should not have any child expression"
- );
- }
}
@Override
@@ -62,24 +58,20 @@ public class PartitionIdFunction extends ScalarFunction {
}
/**
- * The evaluate method is called under the following conditions -
- * 1. When PARTITION_ID() is evaluated in the projection list.
- *
- * 2. When PARTITION_ID() is evaluated in the backend as part of the where
clause.
- *
+ * Since partition id is part of the row key, this method is not called
when PARTITION_ID()
+ * is used with an IN clause.
*/
@Override
public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
if (tuple == null) {
return false;
}
- tuple.getKey(ptr);
if (ptr.getLength() < PARTITION_ID_LENGTH) {
return false;
}
- // The partition id of a row is always the prefix of the row key
- ptr.set(ptr.get(), 0, PARTITION_ID_LENGTH);
- return true;
+ RowKeyColumnExpression partitionIdColumnExpression =
+ (RowKeyColumnExpression) children.get(0);
+ return partitionIdColumnExpression.evaluate(tuple, ptr);
}
@Override
@@ -101,5 +93,4 @@ public class PartitionIdFunction extends ScalarFunction {
public Determinism getDeterminism() {
return Determinism.PER_ROW;
}
-
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index c9fde90ca6..24645b29a9 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -1246,7 +1246,7 @@ public class IndexMaintainer implements Writable,
Iterable<ColumnReference> {
return num;
}
- private RowKeySchema getIndexRowKeySchema() {
+ public RowKeySchema getIndexRowKeySchema() {
if (indexRowKeySchema != null) {
return indexRowKeySchema;
}
@@ -2382,9 +2382,14 @@ public class IndexMaintainer implements Writable,
Iterable<ColumnReference> {
public boolean isUncovered() {
return isUncovered;
}
+
public boolean isCDCIndex() {
return isCDCIndex;
}
+
+ public boolean isMultiTenant() {
+ return isMultiTenant;
+ }
public boolean isImmutableRows() {
return immutableRows;
@@ -2443,4 +2448,8 @@ public class IndexMaintainer implements Writable,
Iterable<ColumnReference> {
public QualifierEncodingScheme getDataEncodingScheme() {
return dataEncodingScheme;
}
+
+ public byte[] getViewIndexId() {
+ return viewIndexId;
+ }
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/PartitionIdParseNode.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/PartitionIdParseNode.java
index 5a5908667f..d0a74fb546 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/PartitionIdParseNode.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/PartitionIdParseNode.java
@@ -20,11 +20,14 @@ package org.apache.phoenix.parse;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.expression.function.FunctionExpression;
import org.apache.phoenix.expression.function.PartitionIdFunction;
-
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.RowKeyValueAccessor;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.List;
public class PartitionIdParseNode extends FunctionParseNode {
@@ -43,6 +46,20 @@ public class PartitionIdParseNode extends FunctionParseNode {
"PartitionIdFunction does not take any parameters"
);
}
- return new PartitionIdFunction(children);
+ PTable table = context.getCurrentTable().getTable();
+ if (table.getViewIndexId()!= null && table.isMultiTenant()) {
+ return new PartitionIdFunction(getExpressions(table, 2));
+ } else if (table.getViewIndexId()!= null || table.isMultiTenant()) {
+ return new PartitionIdFunction(getExpressions(table, 1));
+ } else {
+ return new PartitionIdFunction(getExpressions(table, 0));
+ }
+ }
+
+ private static List<Expression> getExpressions(PTable table, int position)
{
+ List<Expression> expressionList = new ArrayList<>(1);
+ expressionList.add(new
RowKeyColumnExpression(table.getPKColumns().get(position),
+ new RowKeyValueAccessor(table.getPKColumns(), position)));
+ return expressionList;
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index f0c20f1ccf..4bde0727e9 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -35,8 +35,10 @@ import
org.apache.phoenix.expression.SingleCellColumnExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.CDCTableInfo;
import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.CDCChangeBuilder;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -104,7 +106,13 @@ public class CDCGlobalIndexRegionScanner extends
UncoveredGlobalIndexRegionScann
ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
indexToDataRowKeyMap.get(indexRowKey));
Result dataRow = dataRows.get(dataRowKey);
- Long changeTS = firstIndexCell.getTimestamp();
+ int phoenixRowTimestampFunctionOffset = 2 +
(indexMaintainer.isMultiTenant() ? 1 : 0)
+ + (indexMaintainer.getViewIndexId() != null ? 1 : 0);
+ ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+
indexMaintainer.getIndexRowKeySchema().iterator(firstIndexCell.getRowArray(),
+ firstIndexCell.getRowOffset(),
firstIndexCell.getRowLength(), ptr,
+ phoenixRowTimestampFunctionOffset);
+ long changeTS = PLong.INSTANCE.getCodec().decodeLong(ptr,
SortOrder.ASC);
TupleProjector dataTableProjector =
cdcDataTableInfo.getDataTableProjector();
Expression[] expressions = dataTableProjector != null ?
dataTableProjector.getExpressions() : null;
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
index c7cd8a7dc4..094815638c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
@@ -209,7 +210,7 @@ public class CDCQueryIT extends CDCBaseIT {
// Verify that we can access data table mutations by partition id
PreparedStatement statement = conn.prepareStatement(
- getCDCQuery(cdcName, saltBuckets, partitionId));
+ getCDCQuery(cdcName, partitionId));
statement.setTimestamp(1, new Timestamp(1000));
statement.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
rs = statement.executeQuery();
@@ -233,24 +234,59 @@ public class CDCQueryIT extends CDCBaseIT {
assertEquals(saltBuckets, rowCount);
}
- private static String getCDCQuery(String cdcName, int saltBuckets,
- String[] partitionId) {
+ private static String getCDCQuery(String cdcName, String[] partitionId) {
StringBuilder query = new StringBuilder("SELECT PARTITION_ID(),
Count(*) from ");
query.append(cdcName);
query.append(" WHERE PARTITION_ID() IN (");
- for (int i = 0; i < saltBuckets - 1; i++) {
+ for (int i = 0; i < partitionId.length - 1; i++) {
query.append("'");
query.append(partitionId[i]);
query.append("',");
}
query.append("'");
- query.append(partitionId[saltBuckets - 1]);
+ query.append(partitionId[partitionId.length - 1]);
query.append("')");
query.append(" AND PHOENIX_ROW_TIMESTAMP() >= ? AND
PHOENIX_ROW_TIMESTAMP() < ?");
query.append(" Group By PARTITION_ID()");
return query.toString();
}
+ private static String addPartitionInList(Connection conn, String cdcName,
String query)
+ throws SQLException{
+ ResultSet rs = conn.createStatement().executeQuery("SELECT DISTINCT
PARTITION_ID() FROM "
+ + cdcName);
+ List<String> partitionIds = new ArrayList<>();
+ while (rs.next()) {
+ partitionIds.add(rs.getString(1));
+ }
+ StringBuilder builder = new StringBuilder(query);
+ builder.append(" WHERE PARTITION_ID() IN (");
+ boolean initialized = false;
+ for (String partitionId : partitionIds) {
+ if (!initialized) {
+ builder.append("'");
+ initialized = true;
+ } else {
+ builder.append(",'");
+ }
+ builder.append(partitionId);
+ builder.append("'");
+ }
+ builder.append(")");
+ return builder.toString();
+ }
+
+ private static PreparedStatement getCDCQueryPreparedStatement(Connection
conn, String cdcName,
+ String query, long minTimestamp, long maxTimestamp)
+ throws SQLException {
+ StringBuilder builder = new StringBuilder(addPartitionInList(conn,
cdcName, query));
+ builder.append(" AND PHOENIX_ROW_TIMESTAMP() >= ? AND
PHOENIX_ROW_TIMESTAMP() < ?");
+ PreparedStatement statement =
conn.prepareStatement(builder.toString());
+ statement.setTimestamp(1, new Timestamp(minTimestamp));
+ statement.setTimestamp(2, new Timestamp(maxTimestamp));
+ return statement;
+ }
+
@Test
public void testSelectCDC() throws Exception {
String cdcName, cdc_sql;
@@ -283,14 +319,20 @@ public class CDCQueryIT extends CDCBaseIT {
long startTS = System.currentTimeMillis();
List<ChangeRow> changes = generateChanges(startTS, tenantids,
tableName, null,
COMMIT_SUCCESS);
+ long currentTime = System.currentTimeMillis();
+ long endTS = changes.get(changes.size() - 1).getTimestamp() + 1;
+ if (endTS > currentTime) {
+ Thread.sleep(endTS - currentTime);
+ }
String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
try (Connection conn = newConnection(tenantId)) {
// For debug: uncomment to see the exact results logged to console.
dumpCDCResults(conn, cdcName,
new TreeMap<String, String>() {{ put("K", "INTEGER"); }},
- "SELECT /*+ CDC_INCLUDE(PRE, POST) */
PHOENIX_ROW_TIMESTAMP(), K," +
- "\"CDC JSON\" FROM " + cdcFullName);
+ addPartitionInList(conn, cdcFullName,
+ "SELECT /*+ CDC_INCLUDE(PRE, POST) */
PHOENIX_ROW_TIMESTAMP(), K,"
+ + "\"CDC JSON\" FROM " + cdcFullName));
// Existence of an CDC index hint shouldn't cause the regular
query path to fail.
// Run the same query with a CDC index hit and without it and make
sure we get the same
@@ -322,18 +364,22 @@ public class CDCQueryIT extends CDCBaseIT {
put("V2", "INTEGER");
put("B.VB", "INTEGER");
}};
- verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
- "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " +
cdcFullName),
+ verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn,
cdcFullName,
+ "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " +
cdcFullName, startTS,
+ endTS).executeQuery(),
datatableName, dataColumns, changes, CHANGE_IMG);
- verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
- "SELECT /*+ CDC_INCLUDE(CHANGE) */
PHOENIX_ROW_TIMESTAMP(), K," +
- "\"CDC JSON\" FROM " + cdcFullName),
datatableName, dataColumns,
- changes, CHANGE_IMG);
- verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
- "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " +
cdcFullName),
- datatableName, dataColumns, changes, PRE_POST_IMG);
- verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
- "SELECT * FROM " + cdcFullName),
+ verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn,
cdcFullName,
+ "SELECT /*+ CDC_INCLUDE(CHANGE) */
PHOENIX_ROW_TIMESTAMP(), K,"
+ + "\"CDC JSON\" FROM " + cdcFullName,
startTS, endTS)
+ .executeQuery(),
+ datatableName, dataColumns, changes, CHANGE_IMG);
+ verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn,
cdcFullName,
+ "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " +
cdcFullName,
+ startTS, endTS).executeQuery(),
+ datatableName, dataColumns, changes,
+ PRE_POST_IMG);
+ verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn,
cdcFullName,
+ "SELECT * FROM " + cdcFullName,startTS,
endTS).executeQuery(),
datatableName, dataColumns, changes, new HashSet<>());
HashMap<String, int[]> testQueries = new HashMap<String, int[]>()
{{
@@ -424,24 +470,27 @@ public class CDCQueryIT extends CDCBaseIT {
String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
try (Connection conn = newConnection(tenantId)) {
// For debug: uncomment to see the exact results logged to console.
- dumpCDCResults(conn, cdcName, pkColumns,
- "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " +
cdcFullName);
+ dumpCDCResults(conn, cdcName, pkColumns, addPartitionInList(conn,
cdcFullName,
+ "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " +
cdcFullName));
List<ChangeRow> changes = new ArrayList<>();
for (Set<ChangeRow> batch: allBatches.get(tenantId)) {
changes.addAll(batch);
}
verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
- "SELECT * FROM " + cdcFullName),
+ addPartitionInList(conn, cdcFullName,"SELECT * FROM " +
cdcFullName)),
datatableName, dataColumns, changes, CHANGE_IMG);
verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
- "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " +
cdcFullName),
+ addPartitionInList(conn, cdcFullName,
+ "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " +
cdcFullName)),
datatableName, dataColumns, changes, CHANGE_IMG);
verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
- "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " +
cdcFullName),
+ addPartitionInList(conn, cdcFullName,
+ "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " +
cdcFullName)),
datatableName, dataColumns, changes, PRE_POST_IMG);
verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
- "SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ *
FROM " + cdcFullName),
+ addPartitionInList(conn, cdcFullName,
+ "SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ *
FROM " + cdcFullName)),
datatableName, dataColumns, changes, ALL_IMG);
cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName,
cdcName);
checkIndexPartitionIdCount(conn, cdcFullName);