This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new cd043adc16 PHOENIX-7425 Partitioned CDC Index for eliminating salting
(#2015)
cd043adc16 is described below
commit cd043adc163891d118691c251d40faf0aa998262
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Sat Nov 9 00:35:07 2024 -0800
PHOENIX-7425 Partitioned CDC Index for eliminating salting (#2015)
---
.../org/apache/phoenix/compile/DeleteCompiler.java | 4 +
.../org/apache/phoenix/compile/QueryCompiler.java | 8 -
.../apache/phoenix/exception/SQLExceptionCode.java | 3 +
.../org/apache/phoenix/execute/MutationState.java | 72 --------
.../apache/phoenix/expression/ExpressionType.java | 3 +-
.../expression/function/PartitionIdFunction.java | 105 +++++++++++
.../org/apache/phoenix/index/IndexMaintainer.java | 97 ++++++++--
.../apache/phoenix/optimize/QueryOptimizer.java | 13 +-
.../apache/phoenix/parse/PartitionIdParseNode.java | 48 +++++
.../org/apache/phoenix/schema/MetaDataClient.java | 13 +-
.../schema/transform/TransformMaintainer.java | 13 +-
.../main/java/org/apache/phoenix/util/CDCUtil.java | 2 +-
.../java/org/apache/phoenix/util/ScanUtil.java | 2 -
.../coprocessor/BaseScannerRegionObserver.java | 10 ++
.../coprocessor/GlobalIndexRegionScanner.java | 20 ++-
.../coprocessor/IndexRepairRegionScanner.java | 2 +-
.../phoenix/coprocessor/IndexerRegionScanner.java | 7 +-
.../coprocessor/UncoveredIndexRegionScanner.java | 13 +-
.../phoenix/hbase/index/IndexRegionObserver.java | 22 ++-
.../java/org/apache/phoenix/end2end/CDCBaseIT.java | 54 ++++--
.../apache/phoenix/end2end/CDCDefinitionIT.java | 52 +-----
.../org/apache/phoenix/end2end/CDCQueryIT.java | 200 +++++++++++++++++----
.../index/PrepareIndexMutationsForRebuildTest.java | 57 +++---
.../phoenix/index/VerifySingleIndexRowTest.java | 2 +-
.../phoenix/parse/PartitionIdFunctionTest.java | 63 +++++++
25 files changed, 615 insertions(+), 270 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index a4d087288d..476d40dc26 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -90,6 +90,7 @@ import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
@@ -1020,6 +1021,9 @@ public class DeleteCompiler {
}
private static boolean isMaintainedOnClient(PTable table) {
+ if (CDCUtil.isCDCIndex(table)) {
+ return false;
+ }
// Test for not being local (rather than being GLOBAL) so that this
doesn't fail
// when tested with our projected table.
return (table.getIndexType() != IndexType.LOCAL &&
(table.isTransactional() || table.isImmutableRows())) ||
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 2d15206789..ccc789146b 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -804,14 +804,6 @@ public class QueryCompiler {
selectNodes = tmpSelectNodes;
}
List<OrderByNode> orderByNodes = select.getOrderBy();
- // For CDC queries, if no ORDER BY is specified, add default
ordering.
- if (orderByNodes.size() == 0) {
- orderByNodes = Lists.newArrayListWithExpectedSize(1);
- orderByNodes.add(NODE_FACTORY.orderBy(
- NODE_FACTORY.function(PhoenixRowTimestampFunction.NAME,
- Collections.emptyList()),
- false, SortOrder.getDefault() == SortOrder.ASC));
- }
select = NODE_FACTORY.select(select.getFrom(),
select.getHint(), select.isDistinct(), selectNodes,
select.getWhere(),
select.getGroupBy(), select.getHaving(), orderByNodes,
select.getLimit(),
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 48eff59658..d9aa522765 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -56,6 +56,7 @@ import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL;
@@ -379,6 +380,8 @@ public enum SQLExceptionCode {
+ " property is already defined in hierarchy for this entity"),
VIEW_TTL_NOT_ENABLED(10961,"44A43", TTL +
" property can not be set on views as phoenix.view.ttl.enabled is
false"),
+ SALTING_NOT_ALLOWED_FOR_CDC(10962,"44A44", SALT_BUCKETS +
+ " property can not be set for CDC"),
/** Sequence related */
SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new
Factory() {
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
index 6d3c9df480..c1bb46fc85 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -75,9 +75,6 @@ import
org.apache.phoenix.coprocessorclient.MetaDataProtocol.MetaDataMutationRes
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.exception.StaleMetadataCacheException;
-import org.apache.phoenix.hbase.index.AbstractValueGetter;
-import org.apache.phoenix.hbase.index.ValueGetter;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.exception.IndexWriteException;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
@@ -124,7 +121,6 @@ import
org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilit
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.transaction.TransactionFactory.Provider;
import org.apache.phoenix.util.ClientUtil;
-import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
@@ -639,61 +635,6 @@ public class MutationState implements SQLCloseable {
return ptr;
}
- private List<Mutation> getCDCDeleteMutations(PTable table, PTable index,
- Long mutationTimestamp,
- List<Mutation> mutationList)
throws
- SQLException {
- final ImmutableBytesPtr ptr = new ImmutableBytesPtr();
- IndexMaintainer maintainer = index.getIndexMaintainer(table,
connection);
- List<Mutation> indexMutations =
Lists.newArrayListWithExpectedSize(mutationList.size());
- for (final Mutation mutation : mutationList) {
- // Only generate extra row mutations for DELETE
- if (mutation instanceof Delete) {
- ptr.set(mutation.getRow());
- ValueGetter getter = new AbstractValueGetter() {
- @Override
- public byte[] getRowKey() {
- return mutation.getRow();
- }
- @Override
- public ImmutableBytesWritable
getLatestValue(ColumnReference ref, long ts) {
- // Always return null for our empty key value, as this
will cause the index
- // maintainer to always treat this Put as a new row.
- if (IndexUtil.isEmptyKeyValue(table, ref)) {
- return null;
- }
- byte[] family = ref.getFamily();
- byte[] qualifier = ref.getQualifier();
- Map<byte[], List<Cell>> familyMap =
mutation.getFamilyCellMap();
- List<Cell> kvs = familyMap.get(family);
- if (kvs == null) {
- return null;
- }
- for (Cell kv : kvs) {
- if (Bytes.compareTo(kv.getFamilyArray(),
kv.getFamilyOffset(),
- kv.getFamilyLength(), family, 0,
family.length) == 0
- && Bytes.compareTo(kv.getQualifierArray(),
- kv.getQualifierOffset(),
kv.getQualifierLength(),
- qualifier, 0, qualifier.length) == 0) {
- ImmutableBytesPtr ptr = new
ImmutableBytesPtr();
-
connection.getKeyValueBuilder().getValueAsPtr(kv, ptr);
- return ptr;
- }
- }
- return null;
- }
- };
- ImmutableBytesPtr key = new
ImmutableBytesPtr(maintainer.buildRowKey(
- getter, ptr, null, null, mutationTimestamp));
- PRow row = index.newRow(
- connection.getKeyValueBuilder(), mutationTimestamp,
key, false);
- row.delete();
- indexMutations.addAll(row.toRowMutations());
- }
- }
- return indexMutations;
- }
-
private Iterator<Pair<PTable, List<Mutation>>> addRowMutations(final
TableRef tableRef,
final MultiRowMutationState values, final long mutationTimestamp,
final long serverTimestamp,
boolean includeAllIndexes, final boolean sendAll) {
@@ -771,19 +712,6 @@ public class MutationState implements SQLCloseable {
}
}
}
-
- if (CDCUtil.isCDCIndex(index)) {
- List<Mutation> cdcMutations = getCDCDeleteMutations(
- table, index, mutationTimestamp, mutationList);
- if (cdcMutations.size() > 0) {
- if (indexMutations == null) {
- indexMutations = cdcMutations;
- } else {
- indexMutations.addAll(cdcMutations);
- }
- }
- }
-
} catch (SQLException | IOException e) {
throw new IllegalDataException(e);
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
index d71c07185a..bc059d9642 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
@@ -199,7 +199,8 @@ public enum ExpressionType {
JsonModifyFunction(JsonModifyFunction.class),
BsonConditionExpressionFunction(BsonConditionExpressionFunction.class),
BsonUpdateExpressionFunction(BsonUpdateExpressionFunction.class),
- BsonValueFunction(BsonValueFunction.class);
+ BsonValueFunction(BsonValueFunction.class),
+ PartitionIdFunction(PartitionIdFunction.class);
ExpressionType(Class<? extends Expression> clazz) {
this.clazz = clazz;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/PartitionIdFunction.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/PartitionIdFunction.java
new file mode 100644
index 0000000000..25a2c4d0ce
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/PartitionIdFunction.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Determinism;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.parse.PartitionIdParseNode;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PDataType;
+
+import java.util.List;
+
+/**
+ * Function to return the partition id which is the encoded data table region
name as the prefix
+ * of the CDC index row key. This function is used only with CDC Indexes
+ */
+@BuiltInFunction(name = PartitionIdFunction.NAME,
+ nodeClass= PartitionIdParseNode.class,
+ args = {})
+public class PartitionIdFunction extends ScalarFunction {
+ public static final String NAME = "PARTITION_ID";
+ public static final int PARTITION_ID_LENGTH = 32;
+
+ public PartitionIdFunction() {
+ }
+
+ /**
+ * @param children none
+ * {@link org.apache.phoenix.parse.PartitionIdParseNode#create create}
+ * will return the partition id of a given CDC index row.
+ */
+ public PartitionIdFunction(List<Expression> children) {
+ super(children);
+ if (!children.isEmpty()) {
+ throw new IllegalArgumentException(
+ "PartitionIdFunction should not have any child expression"
+ );
+ }
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ /**
+ * The evaluate method is called under the following conditions -
+ * 1. When PARTITION_ID() is evaluated in the projection list.
+ *
+ * 2. When PARTITION_ID() is evaluated in the backend as part of the where
clause.
+ *
+ */
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (tuple == null) {
+ return false;
+ }
+ tuple.getKey(ptr);
+ if (ptr.getLength() < PARTITION_ID_LENGTH) {
+ return false;
+ }
+ // The partition id of a row is always the prefix of the row key
+ ptr.set(ptr.get(), 0, PARTITION_ID_LENGTH);
+ return true;
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return PChar.INSTANCE;
+ }
+
+ @Override
+ public Integer getMaxLength() {
+ return PARTITION_ID_LENGTH;
+ }
+
+ @Override
+ public boolean isStateless() {
+ return false;
+ }
+
+ @Override
+ public Determinism getDeterminism() {
+ return Determinism.PER_ROW;
+ }
+
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 97a0ca785c..c9fde90ca6 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -43,6 +43,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -67,6 +68,7 @@ import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.SingleCellColumnExpression;
import org.apache.phoenix.expression.SingleCellConstructorExpression;
+import org.apache.phoenix.expression.function.PartitionIdFunction;
import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor;
import org.apache.phoenix.hbase.index.AbstractValueGetter;
import org.apache.phoenix.hbase.index.ValueGetter;
@@ -183,7 +185,8 @@ public class IndexMaintainer implements Writable,
Iterable<ColumnReference> {
@Override
public boolean apply(PTable index) {
return sendIndexMaintainer(index) &&
IndexUtil.isGlobalIndex(index)
- && dataTable.getImmutableStorageScheme() ==
index.getImmutableStorageScheme();
+ && (dataTable.getImmutableStorageScheme() ==
index.getImmutableStorageScheme()
+ && !CDCUtil.isCDCIndex(index));
}
});
}
@@ -194,7 +197,7 @@ public class IndexMaintainer implements Writable,
Iterable<ColumnReference> {
public boolean apply(PTable index) {
return sendIndexMaintainer(index) && ((index.getIndexType() ==
IndexType.GLOBAL
&& dataTable.getImmutableStorageScheme() !=
index.getImmutableStorageScheme())
- || index.getIndexType() == IndexType.LOCAL);
+ || index.getIndexType() == IndexType.LOCAL ||
CDCUtil.isCDCIndex(index));
}
});
}
@@ -706,10 +709,18 @@ public class IndexMaintainer implements Writable,
Iterable<ColumnReference> {
this.dataEncodingScheme = sc;
}
- public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable
rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, long ts) {
+ public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable
rowKeyPtr,
+ byte[] regionStartKey, byte[] regionEndKey, long ts) {
+ return buildRowKey(valueGetter, rowKeyPtr, regionStartKey,
regionEndKey, ts, null);
+ }
+ public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable
rowKeyPtr,
+ byte[] regionStartKey, byte[] regionEndKey, long ts, byte[]
encodedRegionName) {
+ if (isCDCIndex && encodedRegionName == null) {
+ throw new IllegalArgumentException("Encoded region name is
required for a CDC index");
+ }
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
boolean prependRegionStartKey = isLocalIndex && regionStartKey != null;
- boolean isIndexSalted = !isLocalIndex && nIndexSaltBuckets > 0;
+ boolean isIndexSalted = !isLocalIndex && !isCDCIndex &&
nIndexSaltBuckets > 0;
int prefixKeyLength =
prependRegionStartKey ? (regionStartKey.length != 0 ?
regionStartKey.length
: regionEndKey.length) : 0;
@@ -785,7 +796,18 @@ public class IndexMaintainer implements Writable,
Iterable<ColumnReference> {
dataColumnType = expression.getDataType();
dataSortOrder = expression.getSortOrder();
isNullable = expression.isNullable();
- expression.evaluate(new ValueGetterTuple(valueGetter,
ts), ptr);
+ if (expression instanceof PartitionIdFunction) {
+ if (i != 0) {
+ throw new DoNotRetryIOException("PARTITION_ID()
has to be the prefix "
+ + "of the index row key!");
+ } else if (!isCDCIndex) {
+ throw new DoNotRetryIOException("PARTITION_ID()
should be used only for"
+ + " CDC Indexes!");
+ }
+ ptr.set(encodedRegionName);
+ } else {
+ expression.evaluate(new ValueGetterTuple(valueGetter,
ts), ptr);
+ }
}
else {
Field field = dataRowKeySchema.getField(dataPkPosition[i]);
@@ -882,7 +904,7 @@ public class IndexMaintainer implements Writable,
Iterable<ColumnReference> {
int dataPosOffset = 0;
int viewConstantsIndex = 0;
try {
- int indexPosOffset = !isLocalIndex && nIndexSaltBuckets > 0 ? 1 :
0;
+ int indexPosOffset = !isLocalIndex && !isCDCIndex &&
nIndexSaltBuckets > 0 ? 1 : 0;
int maxRowKeyOffset = indexRowKeyPtr.getOffset() +
indexRowKeyPtr.getLength();
indexRowKeySchema.iterator(indexRowKeyPtr, ptr, indexPosOffset);
if (isDataTableSalted) {
@@ -905,7 +927,8 @@ public class IndexMaintainer implements Writable,
Iterable<ColumnReference> {
indexPosOffset++;
dataPosOffset++;
}
- indexPosOffset = (!isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0)
+ (isMultiTenant ? 1 : 0) + (viewIndexId == null ? 0 : 1);
+ indexPosOffset = (!isLocalIndex && !isCDCIndex &&
nIndexSaltBuckets > 0 ? 1 : 0)
+ + (isMultiTenant ? 1 : 0) + (viewIndexId == null ? 0 : 1);
BitSet viewConstantColumnBitSet =
this.rowKeyMetaData.getViewConstantColumnBitSet();
BitSet descIndexColumnBitSet =
rowKeyMetaData.getDescIndexColumnBitSet();
int trailingVariableWidthColumnNum = 0;
@@ -1003,8 +1026,7 @@ public class IndexMaintainer implements Writable,
Iterable<ColumnReference> {
return buildRowKey(valueGetter, new
ImmutableBytesWritable(dataRow.getRow()),
null, null, IndexUtil.getMaxTimestamp(dataRow));
}
- public boolean checkIndexRow(final byte[] indexRowKey,
- final Put dataRow) {
+ public boolean checkIndexRow(final byte[] indexRowKey, final Put dataRow) {
if (!shouldPrepareIndexMutations(dataRow)) {
return false;
}
@@ -1016,6 +1038,26 @@ public class IndexMaintainer implements Writable,
Iterable<ColumnReference> {
return true;
}
+ public byte[] getIndexRowKey(final Put dataRow, byte[] encodedRegionName)
{
+ ValueGetter valueGetter = new IndexUtil.SimpleValueGetter(dataRow);
+ return buildRowKey(valueGetter, new
ImmutableBytesWritable(dataRow.getRow()),
+ null, null, IndexUtil.getMaxTimestamp(dataRow),
encodedRegionName);
+ }
+
+ public boolean checkIndexRow(final byte[] indexRowKey, final byte[]
dataRowKey,
+ final Put dataRow, final byte[][] viewConstants) {
+ if (!shouldPrepareIndexMutations(dataRow)) {
+ return false;
+ }
+ byte[] builtDataRowKey = buildDataRowKey(new
ImmutableBytesWritable(indexRowKey),
+ viewConstants);
+ if (Bytes.compareTo(builtDataRowKey, 0, builtDataRowKey.length,
+ dataRowKey, 0, dataRowKey.length) != 0) {
+ return false;
+ }
+ return true;
+ }
+
/**
* Determines if the index row for a given data row should be prepared.
For full
* indexes, index rows should always be prepared. For the partial indexes,
the index row should
@@ -1216,8 +1258,17 @@ public class IndexMaintainer implements Writable,
Iterable<ColumnReference> {
return indexRowKeySchema;
}
- public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter
valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[]
regionStartKey, byte[] regionEndKey, boolean verified) throws IOException {
- byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr,
regionStartKey, regionEndKey, ts);
+ public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter
valueGetter,
+ ImmutableBytesWritable dataRowKeyPtr, long ts, byte[]
regionStartKey,
+ byte[] regionEndKey, boolean verified) throws IOException {
+ return buildUpdateMutation(kvBuilder, valueGetter, dataRowKeyPtr, ts,
regionStartKey,
+ regionEndKey, verified, null);
+ }
+ public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter
valueGetter,
+ ImmutableBytesWritable dataRowKeyPtr, long ts, byte[]
regionStartKey,
+ byte[] regionEndKey, boolean verified, byte[] encodedRegionName)
throws IOException {
+ byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr,
regionStartKey,
+ regionEndKey, ts, encodedRegionName);
return buildUpdateMutation(kvBuilder, valueGetter, dataRowKeyPtr, ts,
regionStartKey, regionEndKey,
indexRowKey, this.getEmptyKeyValueFamily(), coveredColumnsMap,
indexEmptyKeyValueRef, indexWALDisabled,
dataImmutableStorageScheme, immutableStorageScheme, encodingScheme,
dataEncodingScheme, verified);
@@ -1519,12 +1570,24 @@ public class IndexMaintainer implements Writable,
Iterable<ColumnReference> {
* Used for immutable indexes that only index PK column values. In that
case, we can handle a data row deletion,
* since we can build the corresponding index row key.
*/
- public Delete buildDeleteMutation(KeyValueBuilder kvBuilder,
ImmutableBytesWritable dataRowKeyPtr, long ts) throws IOException {
- return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr,
Collections.<Cell>emptyList(), ts, null, null);
- }
-
- public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter
oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<Cell>
pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws
IOException {
- byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr,
regionStartKey, regionEndKey, ts);
+ public Delete buildDeleteMutation(KeyValueBuilder kvBuilder,
+ ImmutableBytesWritable dataRowKeyPtr, long ts, byte[]
encodedRegionName)
+ throws IOException {
+ return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr,
Collections.<Cell>emptyList(),
+ ts, null, null, encodedRegionName);
+ }
+
+ public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter
oldState,
+ ImmutableBytesWritable dataRowKeyPtr, Collection<Cell>
pendingUpdates, long ts,
+ byte[] regionStartKey, byte[] regionEndKey) throws IOException {
+ return buildDeleteMutation(kvBuilder, oldState, dataRowKeyPtr,
pendingUpdates, ts,
+ regionStartKey,regionEndKey, null);
+ }
+ public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter
oldState,
+ ImmutableBytesWritable dataRowKeyPtr, Collection<Cell>
pendingUpdates, long ts,
+ byte[] regionStartKey, byte[] regionEndKey, byte[]
encodedRegionName) throws IOException {
+ byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr,
regionStartKey, regionEndKey,
+ ts, encodedRegionName);
// Delete the entire row if any of the indexed columns changed
DeleteType deleteType = null;
if (oldState == null ||
(deleteType=getDeleteTypeOrNull(pendingUpdates)) != null ||
hasIndexedColumnChanged(oldState, pendingUpdates, ts)) { // Deleting the entire
row
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 6433f9ab31..a562b3ebc8 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -286,11 +286,16 @@ public class QueryOptimizer {
targetColumns, parallelIteratorFactory, plans);
if (hintedPlan != null) {
PTable index = hintedPlan.getTableRef().getTable();
- if (stopAtBestPlan && hintedPlan.isApplicable() &&
(index.getIndexWhere() == null
- || isPartialIndexUsable(select, dataPlan, index))) {
- return Collections.singletonList(hintedPlan);
+ // Ignore any index hint with a CDC index as it is a truncated
index, it only
+ // includes index rows that changed within the max lookback
window
+ if (!CDCUtil.isCDCIndex(index)) {
+ if (stopAtBestPlan && hintedPlan.isApplicable() && (
+ index.getIndexWhere() == null ||
isPartialIndexUsable(select, dataPlan,
+ index))) {
+ return Collections.singletonList(hintedPlan);
+ }
+ plans.add(0, hintedPlan);
}
- plans.add(0, hintedPlan);
}
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/PartitionIdParseNode.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/PartitionIdParseNode.java
new file mode 100644
index 0000000000..5a5908667f
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/PartitionIdParseNode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import org.apache.phoenix.expression.function.PartitionIdFunction;
+
+
+import java.sql.SQLException;
+import java.util.List;
+
+public class PartitionIdParseNode extends FunctionParseNode {
+
+ PartitionIdParseNode(String name, List<ParseNode> children,
+ BuiltInFunctionInfo info) {
+ super(name, children, info);
+ }
+
+ @Override
+ public FunctionExpression create(List<Expression> children,
StatementContext context)
+ throws SQLException {
+ // It does not take any parameters.
+ if (children.size() != 0) {
+ throw new IllegalArgumentException(
+ "PartitionIdFunction does not take any parameters"
+ );
+ }
+ return new PartitionIdFunction(children);
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 6851b6fe31..129275dec7 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.schema;
import static
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TRANSACTIONAL_TABLE;
import static
org.apache.phoenix.exception.SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY;
+import static
org.apache.phoenix.exception.SQLExceptionCode.SALTING_NOT_ALLOWED_FOR_CDC;
import static
org.apache.phoenix.exception.SQLExceptionCode.TABLE_ALREADY_EXIST;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
@@ -169,6 +170,7 @@ import java.util.Objects;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.expression.function.PartitionIdFunction;
import org.apache.phoenix.expression.function.PhoenixRowTimestampFunction;
import org.apache.phoenix.parse.CreateCDCStatement;
import org.apache.phoenix.parse.DropCDCStatement;
@@ -1978,13 +1980,16 @@ public class MetaDataClient {
"CREATE UNCOVERED INDEX " + (statement.isIfNotExists() ? "IF
NOT EXISTS " : "")
+
CDCUtil.getCDCIndexName(statement.getCdcObjName().getName())
+ " ON " + dataTableFullName + " ("
- + PhoenixRowTimestampFunction.NAME + "()) ASYNC";
+ + PartitionIdFunction.NAME + "(), " +
PhoenixRowTimestampFunction.NAME
+ + "()) ASYNC";
List<String> indexProps = new ArrayList<>();
+ // We do not want to replicate CDC indexes
indexProps.add("REPLICATION_SCOPE=0");
- Object saltBucketNum = TableProperty.SALT_BUCKETS.getValue(tableProps);
- if (saltBucketNum != null) {
- indexProps.add("SALT_BUCKETS=" + saltBucketNum);
+ if (TableProperty.SALT_BUCKETS.getValue(tableProps) != null) {
+ throw new
SQLExceptionInfo.Builder(SALTING_NOT_ALLOWED_FOR_CDC).setTableName(
+
statement.getCdcObjName().getName()).build().buildException();
}
+ indexProps.add("SALT_BUCKETS=0");
Object columnEncodedBytes =
TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
if (columnEncodedBytes != null) {
indexProps.add("COLUMN_ENCODED_BYTES=" + columnEncodedBytes);
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
index 4a20afea1f..50c9948faa 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
@@ -467,7 +467,9 @@ public class TransformMaintainer extends IndexMaintainer {
// Builds new table's rowkey using the old table's rowkey.
// This method will change when we support rowkey related transforms
- public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable
rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, long ts) {
+ @Override
+ public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable
rowKeyPtr,
+ byte[] regionStartKey, byte[] regionEndKey, long ts, byte[]
encodedRegionName) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
boolean isNewTableSalted = nNewTableSaltBuckets > 0;
@@ -521,9 +523,12 @@ public class TransformMaintainer extends IndexMaintainer {
}
}
- public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter
valueGetter, ImmutableBytesWritable oldRowKeyPtr,
- long ts, byte[] regionStartKey, byte[]
regionEndKey, boolean verified) throws IOException {
- byte[] newRowKey = this.buildRowKey(valueGetter, oldRowKeyPtr,
regionStartKey, regionEndKey, ts);
+ @Override
+ public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter
valueGetter,
+ ImmutableBytesWritable oldRowKeyPtr, long ts, byte[]
regionStartKey,
+ byte[] regionEndKey, boolean verified, byte[] encodedRegionName)
throws IOException {
+ byte[] newRowKey = this.buildRowKey(valueGetter, oldRowKeyPtr,
regionStartKey, regionEndKey,
+ ts, encodedRegionName);
return buildUpdateMutation(kvBuilder, valueGetter, oldRowKeyPtr, ts,
regionStartKey, regionEndKey,
newRowKey, this.getEmptyKeyValueFamily(), coveredColumnsMap,
newTableEmptyKeyValueRef, newTableWALDisabled,
oldTableImmutableStorageScheme, newTableImmutableStorageScheme,
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index 6e87121ef9..74c2263d33 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -38,7 +38,7 @@ import org.apache.phoenix.schema.types.PDataType;
import org.bson.RawBsonDocument;
public class CDCUtil {
- public static final String CDC_INDEX_PREFIX = "PHOENIX_CDC_INDEX";
+ public static final String CDC_INDEX_PREFIX = "PHOENIX_CDC_INDEX_";
/**
* Make a set of CDC change scope enums from the given string containing
comma separated scope
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 7f50bd6d72..7a97c6a7ed 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -1476,8 +1476,6 @@ public class ScanUtil {
if (context.getCDCTableRef() != null) {
scan.setAttribute(CDC_DATA_TABLE_DEF,
CDCTableInfo.toProto(context).toByteArray());
- CDCUtil.setupScanForCDC(scan);
- adjustScanFilterForGlobalIndexRegionScanner(scan);
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index aab5c178a3..113b94d4f3 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -54,6 +54,7 @@ import
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.filter.PagingFilter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.index.CDCTableInfo;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory;
import org.apache.phoenix.iterate.RegionScannerFactory;
@@ -62,6 +63,7 @@ import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.QueryUtil;
@@ -70,6 +72,7 @@ import org.apache.phoenix.schema.PTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.CDC_DATA_TABLE_DEF;
import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForFilter;
abstract public class BaseScannerRegionObserver implements RegionObserver {
@@ -160,6 +163,13 @@ abstract public class BaseScannerRegionObserver implements
RegionObserver {
TimeRange timeRange = scan.getTimeRange();
scan.setTimeRange(timeRange.getMin(), Bytes.toLong(txnScn));
}
+ if (!scan.isRaw()) {
+ byte[] cdcScan = scan.getAttribute(CDC_DATA_TABLE_DEF);
+ if (cdcScan != null) {
+ CDCUtil.setupScanForCDC(scan);
+ ScanUtil.adjustScanFilterForGlobalIndexRegionScanner(scan);
+ }
+ }
if (isRegionObserverFor(scan)) {
// For local indexes, we need to throw if out of region as we'll
get inconsistent
// results otherwise while in other cases, it may just mean out
client-side data
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index f5ef7a87dc..52ca74c1d5 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -1199,13 +1199,13 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
}
private static Put prepareIndexPutForRebuild(IndexMaintainer
indexMaintainer,
- ImmutableBytesPtr rowKeyPtr, ValueGetter mergedRowVG, long ts)
throws IOException {
+ ImmutableBytesPtr rowKeyPtr, ValueGetter mergedRowVG, long ts,
byte[] encodedRegionName) throws IOException {
Put indexPut =
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
- mergedRowVG, rowKeyPtr, ts, null, null, false);
+ mergedRowVG, rowKeyPtr, ts, null, null, false,
encodedRegionName);
if (indexPut == null) {
// No covered column. Just prepare an index row with the empty
column
byte[] indexRowKey = indexMaintainer.buildRowKey(mergedRowVG,
rowKeyPtr,
- null, null, ts);
+ null, null, ts, encodedRegionName);
indexPut = new Put(indexRowKey);
} else {
IndexUtil.removeEmptyColumn(indexPut,
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
@@ -1280,7 +1280,7 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
* and mutation type where delete comes after put.
*/
public static List<Mutation>
prepareIndexMutationsForRebuild(IndexMaintainer indexMaintainer,
- Put dataPut,
Delete dataDel) throws IOException {
+ Put dataPut, Delete dataDel, byte[] encodedRegionName) throws
IOException {
List<Mutation> dataMutations = getMutationsWithSameTS(dataPut,
dataDel);
List<Mutation> indexMutations =
Lists.newArrayListWithExpectedSize(dataMutations.size());
// The row key ptr of the data table row for which we will build index
rows here
@@ -1341,7 +1341,7 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
}
ValueGetter nextDataRowVG = new
IndexUtil.SimpleValueGetter(nextDataRow);
Put indexPut = prepareIndexPutForRebuild(indexMaintainer,
rowKeyPtr,
- nextDataRowVG, ts);
+ nextDataRowVG, ts, encodedRegionName);
indexMutations.add(indexPut);
// Delete the current index row if the new index key is
different than the current one
if (indexRowKeyForCurrentDataRow != null) {
@@ -1380,8 +1380,9 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
// CDC Index needs two delete markers one for
deleting the index row,
// and the other for referencing the data table
delete mutation with
// the right index row key, that is, the index row
key starting with ts
-
indexMutations.add(IndexRegionObserver.getDeleteIndexMutation(
- currentDataRowState, indexMaintainer, ts,
rowKeyPtr));
+ indexMutations.add(IndexRegionObserver
+
.getDeleteIndexMutation(currentDataRowState, indexMaintainer,
+ ts, rowKeyPtr, encodedRegionName));
}
}
currentDataRowState = null;
@@ -1398,7 +1399,7 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
}
ValueGetter nextDataRowVG = new
IndexUtil.SimpleValueGetter(nextDataRowState);
Put indexPut = prepareIndexPutForRebuild(indexMaintainer,
rowKeyPtr,
- nextDataRowVG, ts);
+ nextDataRowVG, ts, encodedRegionName);
indexMutations.add(indexPut);
// Delete the current index row if the new index key is
different than the current one
if (indexRowKeyForCurrentDataRow != null) {
@@ -1424,7 +1425,8 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
Set<byte[]> mostRecentIndexRowKeys)
throws IOException {
List<Mutation> indexMutations;
- indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put,
del);
+ indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put,
del,
+ region.getRegionInfo().getEncodedNameAsBytes());
Collections.reverse(indexMutations);
boolean mostRecentDone = false;
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
index ef444c1c8d..f82088d217 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
@@ -121,7 +121,7 @@ public class IndexRepairRegionScanner extends
GlobalIndexRegionScanner {
del.add(cell);
}
}
- List<Mutation> indexMutations =
prepareIndexMutationsForRebuild(indexMaintainer, put, del);
+ List<Mutation> indexMutations =
prepareIndexMutationsForRebuild(indexMaintainer, put, del, null);
Collections.reverse(indexMutations);
for (Mutation mutation : indexMutations) {
byte[] indexRowKey = mutation.getRow();
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
index d0b56b35b5..105d21f1ba 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
@@ -152,7 +152,8 @@ public class IndexerRegionScanner extends
GlobalIndexRegionScanner {
ValueGetter valueGetter = new IndexUtil.SimpleValueGetter(dataRow);
long ts = IndexUtil.getMaxTimestamp(dataRow);
Put indexPut =
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
- valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts,
null, null, false);
+ valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts,
null, null,
+ false, region.getRegionInfo().getEncodedNameAsBytes());
if (indexPut == null) {
// This means the data row does not have any covered column values
@@ -372,6 +373,7 @@ public class IndexerRegionScanner extends
GlobalIndexRegionScanner {
public boolean next(List<Cell> results) throws IOException {
Cell lastCell = null;
int rowCount = 0;
+ byte[] encodedRegionName =
region.getRegionInfo().getEncodedNameAsBytes();
region.startRegionOperation();
try {
synchronized (innerScanner) {
@@ -417,7 +419,8 @@ public class IndexerRegionScanner extends
GlobalIndexRegionScanner {
uuidValue = commitIfReady(uuidValue, mutations);
} else {
indexKeyToDataPutMap
- .put(indexMaintainer.getIndexRowKey(put),
put);
+ .put(indexMaintainer.getIndexRowKey(put,
encodedRegionName),
+ put);
}
rowCount++;
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index d8cab301f6..aa0ac08c43 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -300,7 +300,14 @@ public abstract class UncoveredIndexRegionScanner extends
BaseRegionScanner {
for (Cell cell : dataRow.rawCells()) {
put.add(cell);
}
- if (indexMaintainer.checkIndexRow(indexRowKey, put)) {
+ if (indexMaintainer.isCDCIndex()) {
+ // A CDC index row key is PARTITION_ID() + PHOENIX_ROW_TIMESTAMP()
+ data row key. The
+ // only necessary check is the row timestamp check since the data
row key is extracted
+ // from the index row key and PARTITION_ID() changes during region
splits and merges
+ if (IndexUtil.getMaxTimestamp(put) == indexTimestamp) {
+ return true;
+ }
+ } else if (indexMaintainer.checkIndexRow(indexRowKey, put)) {
if (IndexUtil.getMaxTimestamp(put) != indexTimestamp) {
Mutation[] mutations;
Put indexPut = new Put(indexRowKey);
@@ -316,8 +323,10 @@ public abstract class UncoveredIndexRegionScanner extends
BaseRegionScanner {
}
return true;
}
+ // This is not a valid index row
if (indexMaintainer.isAgedEnough(IndexUtil.getMaxTimestamp(put),
ageThreshold)) {
- region.delete(indexMaintainer.createDelete(indexRowKey,
IndexUtil.getMaxTimestamp(put), false));
+ region.delete(indexMaintainer.createDelete(indexRowKey,
IndexUtil.getMaxTimestamp(put),
+ false));
}
return false;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index d4c977c5cb..8a144f1245 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -378,6 +378,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private List<Set<ImmutableBytesPtr>> batchesWithLastTimestamp = new
ArrayList<>();
private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS =
100;
+ private byte[] encodedRegionName;
@Override
public Optional<RegionObserver> getRegionObserver() {
@@ -388,6 +389,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
public void start(CoprocessorEnvironment e) throws IOException {
try {
final RegionCoprocessorEnvironment env =
(RegionCoprocessorEnvironment) e;
+ encodedRegionName =
env.getRegion().getRegionInfo().getEncodedNameAsBytes();
String serverName = env.getServerName().getServerName();
if (env.getConfiguration().getBoolean(CHECK_VERSION_CONF_KEY, true)) {
// make sure the right version <-> combinations are allowed.
@@ -967,9 +969,10 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
}
public static Mutation getDeleteIndexMutation(Put dataRowState,
IndexMaintainer indexMaintainer,
- long ts, ImmutableBytesPtr rowKeyPtr) {
+ long ts, ImmutableBytesPtr rowKeyPtr, byte[] encodedRegionName) {
ValueGetter dataRowVG = new IndexUtil.SimpleValueGetter(dataRowState);
- byte[] indexRowKey = indexMaintainer.buildRowKey(dataRowVG, rowKeyPtr,
null, null, ts);
+ byte[] indexRowKey = indexMaintainer.buildRowKey(dataRowVG, rowKeyPtr,
null, null,
+ ts, encodedRegionName);
return indexMaintainer.buildRowDeleteMutation(indexRowKey,
IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
}
@@ -1004,11 +1007,11 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
&&
indexMaintainer.shouldPrepareIndexMutations(nextDataRowState)) {
ValueGetter nextDataRowVG = new
IndexUtil.SimpleValueGetter(nextDataRowState);
Put indexPut =
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
- nextDataRowVG, rowKeyPtr, ts, null, null, false);
+ nextDataRowVG, rowKeyPtr, ts, null, null, false,
encodedRegionName);
if (indexPut == null) {
// No covered column. Just prepare an index row with
the empty column
byte[] indexRowKey =
indexMaintainer.buildRowKey(nextDataRowVG, rowKeyPtr,
- null, null, ts);
+ null, null, ts, encodedRegionName);
indexPut = new Put(indexRowKey);
} else {
IndexUtil.removeEmptyColumn(indexPut,
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
@@ -1023,8 +1026,9 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
// Delete the current index row if the new index key is
different than the current one
if (currentDataRowState != null) {
ValueGetter currentDataRowVG = new
IndexUtil.SimpleValueGetter(currentDataRowState);
- byte[] indexRowKeyForCurrentDataRow =
indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
- null, null, ts);
+ byte[] indexRowKeyForCurrentDataRow = indexMaintainer
+ .buildRowKey(currentDataRowVG, rowKeyPtr,
null, null,
+ ts, encodedRegionName);
if (Bytes.compareTo(indexPut.getRow(),
indexRowKeyForCurrentDataRow) != 0) {
Mutation del =
indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
IndexMaintainer.DeleteType.ALL_VERSIONS,
ts);
@@ -1036,7 +1040,8 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
&&
indexMaintainer.shouldPrepareIndexMutations(currentDataRowState)) {
context.indexUpdates.put(hTableInterfaceReference,
new Pair<Mutation,
byte[]>(getDeleteIndexMutation(currentDataRowState,
- indexMaintainer, ts, rowKeyPtr),
rowKeyPtr.get()));
+ indexMaintainer, ts, rowKeyPtr,
encodedRegionName),
+ rowKeyPtr.get()));
if (indexMaintainer.isCDCIndex()) {
// CDC Index needs two delete markers one for deleting
the index row, and
// the other for referencing the data table delete
mutation with the
@@ -1047,7 +1052,8 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
ByteUtil.EMPTY_BYTE_ARRAY);
context.indexUpdates.put(hTableInterfaceReference,
new Pair<Mutation,
byte[]>(getDeleteIndexMutation(cdcDataRowState,
- indexMaintainer, ts, rowKeyPtr),
rowKeyPtr.get()));
+ indexMaintainer, ts, rowKeyPtr,
encodedRegionName),
+ rowKeyPtr.get()));
}
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
index 508f6fced0..08bf35279d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -173,13 +173,13 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
}
protected void createCDC(Connection conn, String cdc_sql) throws Exception
{
- createCDC(conn, cdc_sql, null, null);
+ createCDC(conn, cdc_sql, null);
}
protected void createCDC(Connection conn, String cdc_sql,
- PTable.QualifierEncodingScheme encodingScheme, Integer
nSaltBuckets) throws Exception {
+ PTable.QualifierEncodingScheme encodingScheme) throws Exception {
// For CDC, multitenancy gets derived automatically via the parent
table.
- createTable(conn, cdc_sql, encodingScheme, false, nSaltBuckets, false,
null);
+ createTable(conn, cdc_sql, encodingScheme, false, null, false, null);
}
protected void assertCDCState(Connection conn, String cdcName, String
expInclude,
@@ -297,9 +297,8 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
return changeRow;
}
- protected List<Set<ChangeRow>> generateMutations(long startTS, Map<String,
String> pkColumns,
- Map<String, String>
dataColumns,
- int nRows, int nBatches)
+ protected List<Set<ChangeRow>> generateMutations(String tenantId, long
startTS, Map<String,
+ String> pkColumns, Map<String, String> dataColumns, int nRows, int
nBatches)
{
Random rand = new Random();
// Generate unique rows
@@ -336,11 +335,11 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
}
ChangeRow changeRow;
if (isDelete) {
- changeRow = new ChangeRow(null, batchTS, rows.get(j),
null);
+ changeRow = new ChangeRow(tenantId, batchTS,
rows.get(j), null);
gotDelete = true;
}
else {
- changeRow = new ChangeRow(null, batchTS, rows.get(j),
+ changeRow = new ChangeRow(tenantId, batchTS,
rows.get(j),
generateSampleData(rand, dataColumns, true));
}
batch.add(changeRow);
@@ -637,16 +636,18 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
protected void verifyChangesViaSCN(String tenantId, ResultSet rs, String
dataTableName,
Map<String, String> dataColumns,
List<ChangeRow> changes,
Set<PTable.CDCChangeScope>
changeScopes) throws Exception {
+ // CDC guarantees that the set of changes on a given row is delivered
in the order of
+ // their change timestamps. That is why we need to convert the list of
changes to
+ // a collection of per row list of changes
+ Map<Map<String, Object>, List<ChangeRow>> changeMap = new HashMap();
Set<Map<String, Object>> deletedRows = new HashSet<>();
- for (int i = 0, changenr = 0; i < changes.size(); ++i) {
- ChangeRow changeRow = changes.get(i);
+ for (ChangeRow changeRow : changes) {
if (tenantId != null && changeRow.getTenantID() != tenantId) {
continue;
}
if (changeRow.getChangeType() == CDC_DELETE_EVENT_TYPE) {
// Consecutive delete operations don't appear as separate
events.
if (deletedRows.contains(changeRow.pks)) {
- ++changenr;
continue;
}
deletedRows.add(changeRow.pks);
@@ -654,14 +655,31 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
else {
deletedRows.remove(changeRow.pks);
}
- String changeDesc = "Change " + changenr + ": " + changeRow;
- assertTrue(changeDesc, rs.next());
+ List<ChangeRow> rowVersionList = changeMap.get(changeRow.pks);
+ if (rowVersionList == null) {
+ rowVersionList = new ArrayList<>();
+ changeMap.put(changeRow.pks, rowVersionList);
+ }
+ rowVersionList.add(changeRow);
+ }
+
+ while (rs.next()) {
+ Map<String, Object> pks = new HashMap<>();
+ for (Map.Entry<String, Object> pkCol:
changes.get(0).pks.entrySet()) {
+ pks.put(pkCol.getKey(), rs.getObject(pkCol.getKey()));
+ }
+ ChangeRow changeRow = changeMap.get(pks).remove(0);
+ String changeDesc = "Change: " + changeRow;
for (Map.Entry<String, Object> pkCol: changeRow.pks.entrySet()) {
- assertEquals(changeDesc, pkCol.getValue(),
rs.getObject(pkCol.getKey()));
+ if (!pkCol.getValue().equals(rs.getObject(pkCol.getKey()))) {
+ assertEquals(changeDesc, pkCol.getValue(),
rs.getObject(pkCol.getKey()));
+ }
}
Map<String, Object> cdcObj =
mapper.reader(HashMap.class).readValue(
rs.getString(changeRow.pks.size()+2));
- assertEquals(changeDesc, changeRow.getChangeType(),
cdcObj.get(CDC_EVENT_TYPE));
+ if (!changeRow.getChangeType().equals(cdcObj.get(CDC_EVENT_TYPE)))
{
+ assertEquals(changeDesc, changeRow.getChangeType(),
cdcObj.get(CDC_EVENT_TYPE));
+ }
if (cdcObj.containsKey(CDC_PRE_IMAGE)
&& ! ((Map) cdcObj.get(CDC_PRE_IMAGE)).isEmpty()
&& changeScopes.contains(PTable.CDCChangeScope.PRE)) {
@@ -683,7 +701,11 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
assertEquals(changeDesc, postImage, fillInNulls(
(Map<String, Object>) cdcObj.get(CDC_POST_IMAGE),
dataColumns.keySet()));
}
- ++changenr;
+ }
+
+ // Make sure that all the expected changes are returned by CDC
+ for (List<ChangeRow> rowVersionList : changeMap.values()) {
+ assertTrue(rowVersionList.isEmpty());
}
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
index 21dae3bf70..274314f106 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
@@ -87,7 +87,7 @@ public class CDCDefinitionIT extends CDCBaseIT {
}
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- createCDC(conn, cdc_sql, null, null);
+ createCDC(conn, cdc_sql, null);
assertCDCState(conn, cdcName, null, 3);
assertNoResults(conn, cdcName);
@@ -113,51 +113,6 @@ public class CDCDefinitionIT extends CDCBaseIT {
conn.close();
}
- @Test
- public void testCreateWithSalt() throws Exception {
- // Indexes on views don't support salt buckets and is currently
silently ignored.
- if (forView) {
- return;
- }
-
- // {data table bucket count, CDC bucket count}
- Integer[][] saltingConfigs = new Integer[][] {
- new Integer[]{null, 2},
- new Integer[]{0, 2},
- new Integer[]{4, null},
- new Integer[]{4, 1},
- new Integer[]{4, 0},
- new Integer[]{4, 2}
- };
-
- for (Integer[] saltingConfig: saltingConfigs) {
- try (Connection conn = newConnection()) {
- String tableName = generateUniqueName();
- createTable(conn, "CREATE TABLE " + tableName +
- " ( k INTEGER PRIMARY KEY, v1 INTEGER, v2
DATE)",
- null, false, saltingConfig[0], false, null);
- assertSaltBuckets(conn, tableName, saltingConfig[0]);
-
- String cdcName = generateUniqueName();
- String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- createCDC(conn, cdc_sql, null,
- saltingConfig[1]);
- try {
- assertCDCState(conn, cdcName, null, 3);
- assertSaltBuckets(conn, cdcName, null);
- // Index inherits table salt buckets.
- assertSaltBuckets(conn, CDCUtil.getCDCIndexName(cdcName),
- saltingConfig[1] != null ? saltingConfig[1] :
saltingConfig[0]);
- assertNoResults(conn, cdcName);
- } catch (Exception error) {
- throw new AssertionError("{tableSaltBuckets=" +
saltingConfig[0] + ", " +
- "cdcSaltBuckets=" + saltingConfig[1] + "} " +
error.getMessage(),
- error);
- }
- }
- }
- }
-
@Test
public void testCreateWithSchemaName() throws Exception {
Properties props = new Properties();
@@ -206,8 +161,9 @@ public class CDCDefinitionIT extends CDCBaseIT {
assertEquals(true, indexTable.isMultiTenant());
List<PColumn> idxPkColumns = indexTable.getPKColumns();
assertEquals(":TENANTID", idxPkColumns.get(0).getName().getString());
- assertEquals(": PHOENIX_ROW_TIMESTAMP()",
idxPkColumns.get(1).getName().getString());
- assertEquals(":K", idxPkColumns.get(2).getName().getString());
+ assertEquals(": PARTITION_ID()",
idxPkColumns.get(1).getName().getString());
+ assertEquals(": PHOENIX_ROW_TIMESTAMP()",
idxPkColumns.get(2).getName().getString());
+ assertEquals(":K", idxPkColumns.get(3).getName().getString());
PTable cdcTable = PhoenixRuntime.getTable(conn, cdcName);
assertEquals(true, cdcTable.isMultiTenant());
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
index 317c2af895..c7cd8a7dc4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -18,8 +18,15 @@
package org.apache.phoenix.end2end;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.filter.DistinctPrefixFilter;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.RowKeyOrderedAggregateResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
@@ -41,11 +48,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Calendar;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -86,31 +93,29 @@ public class CDCQueryIT extends CDCBaseIT {
private final boolean forView;
private final PTable.QualifierEncodingScheme encodingScheme;
private final boolean multitenant;
- private final Integer indexSaltBuckets;
private final Integer tableSaltBuckets;
private final boolean withSchemaName;
public CDCQueryIT(Boolean forView,
PTable.QualifierEncodingScheme encodingScheme, boolean
multitenant,
- Integer indexSaltBuckets, Integer tableSaltBuckets,
boolean withSchemaName) {
+ Integer tableSaltBuckets, boolean withSchemaName) {
this.forView = forView;
this.encodingScheme = encodingScheme;
this.multitenant = multitenant;
- this.indexSaltBuckets = indexSaltBuckets;
this.tableSaltBuckets = tableSaltBuckets;
this.withSchemaName = withSchemaName;
}
@Parameterized.Parameters(name = "forView={0}, encodingScheme={1}, " +
- "multitenant={2}, indexSaltBuckets={3}, tableSaltBuckets={4}
withSchemaName={5}")
+ "multitenant={2}, tableSaltBuckets={3}, withSchemaName={4}")
public static synchronized Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
- { Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null,
null, Boolean.FALSE },
- { Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null,
null, Boolean.TRUE },
- { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, null,
4, Boolean.FALSE },
- { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 1, 2,
Boolean.TRUE },
- { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 4,
null, Boolean.FALSE },
- { Boolean.TRUE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null,
null, Boolean.FALSE },
+ { Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null,
Boolean.FALSE },
+ { Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null,
Boolean.TRUE },
+ { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 4,
Boolean.FALSE },
+ { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 2,
Boolean.TRUE },
+ { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, null,
Boolean.FALSE },
+ { Boolean.TRUE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null,
Boolean.FALSE },
});
}
@@ -135,6 +140,117 @@ public class CDCQueryIT extends CDCBaseIT {
String explainPlan = QueryUtil.getExplainPlan(rs);
assertFalse(explainPlan.contains(cdcName));
}
+
+ private boolean isDistinctPrefixFilterIncludedInFilterList(FilterList
filterList) {
+ for (Filter filter : filterList.getFilters()) {
+ if (filter instanceof DistinctPrefixFilter) {
+ return true;
+ } else if (filter instanceof FilterList) {
+ return isDistinctPrefixFilterIncludedInFilterList((FilterList)
filter);
+ }
+ }
+ return false;
+ }
+ private boolean isDistinctPrefixFilterIncluded(Scan scan) {
+ Filter filter = scan.getFilter();
+ if (filter != null && filter instanceof DistinctPrefixFilter) {
+ return true;
+ } else if (filter instanceof FilterList) {
+ return isDistinctPrefixFilterIncludedInFilterList((FilterList)
filter);
+ }
+ return false;
+ }
+
+ private void checkIndexPartitionIdCount(Connection conn, String cdcName)
throws Exception {
+ // Verify that we can use retrieve partition ids
+ ResultSet rs = conn.createStatement().executeQuery("SELECT
PARTITION_ID() FROM "
+ + cdcName + " ORDER BY PARTITION_ID()");
+ int saltBuckets = tableSaltBuckets == null ? 1 : tableSaltBuckets;
+ String[] partitionId = new String[saltBuckets];
+ int[] countPerPartition = new int[saltBuckets];
+ int partitionIndex = 0;
+ assertTrue(rs.next());
+ partitionId[partitionIndex] = rs.getString(1);
+ countPerPartition[partitionIndex]++;
+ LOGGER.info("PARTITION_ID["+ partitionIndex + "] = " +
partitionId[partitionIndex]);
+ while (rs.next()) {
+ if (!partitionId[partitionIndex].equals(rs.getString(1))) {
+ partitionIndex++;
+ partitionId[partitionIndex] = rs.getString(1);
+ LOGGER.info("PARTITION_ID["+ partitionIndex + "] = " +
partitionId[partitionIndex]);
+ }
+ countPerPartition[partitionIndex]++;
+ }
+ // Verify that the number of partitions equals to the number of table
regions. In this case,
+ // it equals to the number of salt buckets
+ assertEquals(saltBuckets, partitionIndex + 1);
+
+ rs = conn.createStatement().executeQuery("SELECT DISTINCT
PARTITION_ID() FROM "
+ + cdcName);
+ assertTrue(rs.next());
+ partitionIndex = 0;
+ partitionId[partitionIndex] = rs.getString(1);
+ int rowCount = 1;
+ while (rs.next()) {
+ if (!partitionId[partitionIndex].equals(rs.getString(1))) {
+ partitionIndex++;
+ partitionId[partitionIndex] = rs.getString(1);
+ LOGGER.info("PARTITION_ID["+ partitionIndex + "] = " +
partitionId[partitionIndex]);
+ }
+ rowCount++;
+ }
+ // Verify that the number of partitions equals to the number of table
regions. In this case,
+ // it equals to the number of salt buckets
+ assertEquals(saltBuckets, partitionIndex + 1);
+ // Verified that we only got distinct partition ids
+ assertEquals(saltBuckets, rowCount);
+ //Verify that DistinctPrefixFilter is used to efficiently retrieve
partition ids
+ assertTrue(isDistinctPrefixFilterIncluded(((PhoenixResultSet)
rs).getContext().getScan()));
+
+ // Verify that we can access data table mutations by partition id
+ PreparedStatement statement = conn.prepareStatement(
+ getCDCQuery(cdcName, saltBuckets, partitionId));
+ statement.setTimestamp(1, new Timestamp(1000));
+ statement.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
+ rs = statement.executeQuery();
+ rowCount = 0;
+ while(rs.next()) {
+ rowCount++;
+ String id = rs.getString(1);
+ int count = rs.getInt(2);
+ boolean found = false;
+ for (int i = 0; i < saltBuckets; i++) {
+ if (partitionId[i].equals(id) && count ==
countPerPartition[i]) {
+ found = true;
+ break;
+ }
+ }
+ assertTrue(found);
+ }
+ // Verify that partition id based queries are row key prefix queries
+ ResultIterator resultIterator = ((PhoenixResultSet)
rs).getUnderlyingIterator();
+ assertTrue(resultIterator instanceof
RowKeyOrderedAggregateResultIterator);
+ assertEquals(saltBuckets, rowCount);
+ }
+
+ private static String getCDCQuery(String cdcName, int saltBuckets,
+ String[] partitionId) {
+ StringBuilder query = new StringBuilder("SELECT PARTITION_ID(),
Count(*) from ");
+ query.append(cdcName);
+ query.append(" WHERE PARTITION_ID() IN (");
+ for (int i = 0; i < saltBuckets - 1; i++) {
+ query.append("'");
+ query.append(partitionId[i]);
+ query.append("',");
+ }
+ query.append("'");
+ query.append(partitionId[saltBuckets - 1]);
+ query.append("')");
+ query.append(" AND PHOENIX_ROW_TIMESTAMP() >= ? AND
PHOENIX_ROW_TIMESTAMP() < ?");
+ query.append(" Group By PARTITION_ID()");
+ return query.toString();
+ }
+
@Test
public void testSelectCDC() throws Exception {
String cdcName, cdc_sql;
@@ -155,8 +271,7 @@ public class CDCQueryIT extends CDCBaseIT {
}
cdcName = generateUniqueName();
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- createCDC(conn, cdc_sql, encodingScheme,
- indexSaltBuckets);
+ createCDC(conn, cdc_sql, encodingScheme);
}
String tenantId = multitenant ? "1000" : null;
@@ -177,16 +292,28 @@ public class CDCQueryIT extends CDCBaseIT {
"SELECT /*+ CDC_INCLUDE(PRE, POST) */
PHOENIX_ROW_TIMESTAMP(), K," +
"\"CDC JSON\" FROM " + cdcFullName);
- // Existence of CDC shouldn't cause the regular query path to fail.
+ // Existence of an CDC index hint shouldn't cause the regular
query path to fail.
+ // Run the same query with a CDC index hit and without it and make
sure we get the same
+ // result from both
String uncovered_sql = "SELECT " + " /*+ INDEX(" + tableName + " "
+
CDCUtil.getCDCIndexName(cdcName) + ") */ k, v1 FROM " +
tableName;
try (ResultSet rs =
conn.createStatement().executeQuery(uncovered_sql)) {
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals(201, rs.getInt(2));
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
assertEquals(300, rs.getInt(2));
+ assertFalse(rs.next());
+ }
+ uncovered_sql = "SELECT " + " k, v1 FROM " + tableName;
+ try (ResultSet rs =
conn.createStatement().executeQuery(uncovered_sql)) {
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertEquals(201, rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+ assertEquals(300, rs.getInt(2));
assertFalse(rs.next());
}
@@ -210,19 +337,18 @@ public class CDCQueryIT extends CDCBaseIT {
datatableName, dataColumns, changes, new HashSet<>());
HashMap<String, int[]> testQueries = new HashMap<String, int[]>()
{{
- put("SELECT 'dummy', k, \"CDC JSON\" FROM " + cdcFullName,
+ put("SELECT 'dummy', k, \"CDC JSON\" FROM " + cdcFullName
+ + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC, K ASC",
new int[]{1, 2, 3, 1, 1, 1, 1, 2, 1, 1, 1, 1});
- put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " +
cdcFullName +
- " ORDER BY k ASC", new int[]{1, 1, 1, 1, 1, 1, 1, 1,
1, 2, 2, 3});
- put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " +
cdcFullName +
- " ORDER BY k DESC", new int[]{3, 2, 2, 1, 1, 1, 1, 1,
1, 1, 1, 1});
- put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " +
cdcFullName +
- " ORDER BY PHOENIX_ROW_TIMESTAMP() DESC",
- new int[]{1, 1, 1, 1, 2, 1, 1, 1, 1, 3, 2, 1});
+ put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " +
cdcFullName
+ + " ORDER BY k ASC", new int[]{1, 1, 1, 1, 1, 1, 1, 1,
1, 2, 2, 3});
+ put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " +
cdcFullName
+ + " ORDER BY k DESC", new int[]{3, 2, 2, 1, 1, 1, 1,
1, 1, 1, 1, 1});
}};
Map<String, String> dummyChange = new HashMap() {{
put(CDC_EVENT_TYPE, "dummy");
}};
+
for (Map.Entry<String, int[]> testQuery : testQueries.entrySet()) {
try (ResultSet rs =
conn.createStatement().executeQuery(testQuery.getKey())) {
for (int i = 0; i < testQuery.getValue().length; ++i) {
@@ -278,7 +404,7 @@ public class CDCQueryIT extends CDCBaseIT {
}
cdcName = generateUniqueName();
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE
(change)";
- createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+ createCDC(conn, cdc_sql, encodingScheme);
}
String tenantId = multitenant ? "1000" : null;
@@ -290,7 +416,7 @@ public class CDCQueryIT extends CDCBaseIT {
long startTS = System.currentTimeMillis();
Map<String, List<Set<ChangeRow>>> allBatches = new
HashMap<>(tenantids.length);
for (String tid: tenantids) {
- allBatches.put(tid, generateMutations(startTS, pkColumns,
dataColumns, 20, 5));
+ allBatches.put(tid, generateMutations(tenantId, startTS,
pkColumns, dataColumns, 20, 5));
applyMutations(COMMIT_SUCCESS, schemaName, tableName,
datatableName, tid,
allBatches.get(tid), cdcName);
}
@@ -318,6 +444,7 @@ public class CDCQueryIT extends CDCBaseIT {
"SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ *
FROM " + cdcFullName),
datatableName, dataColumns, changes, ALL_IMG);
cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName,
cdcName);
+ checkIndexPartitionIdCount(conn, cdcFullName);
}
}
@@ -342,7 +469,7 @@ public class CDCQueryIT extends CDCBaseIT {
cdcName = generateUniqueName();
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+ createCDC(conn, cdc_sql, encodingScheme);
}
String tenantId = multitenant ? "1000" : null;
@@ -368,13 +495,16 @@ public class CDCQueryIT extends CDCBaseIT {
"SELECT /*+ CDC_INCLUDE(PRE, POST) */
PHOENIX_ROW_TIMESTAMP(), K," +
"\"CDC JSON\" FROM " + cdcFullName);
verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
- "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " +
cdcFullName),
+ "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " +
cdcFullName
+ + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC"),
datatableName, dataColumns, changes, PRE_POST_IMG);
verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
- "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " +
cdcFullName),
+ "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + cdcFullName
+ + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC"),
datatableName, dataColumns, changes, CHANGE_IMG);
verifyChangesViaSCN(tenantId,
conn.createStatement().executeQuery("SELECT /*+ CDC_INCLUDE(CHANGE) */ " +
- "PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " +
cdcFullName),
+ "PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " +
cdcFullName
+ + " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC"),
datatableName, dataColumns, changes, CHANGE_IMG);
cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName,
cdcName);
}
@@ -413,7 +543,7 @@ public class CDCQueryIT extends CDCBaseIT {
}
cdcName = generateUniqueName();
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE
(change)";
- createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+ createCDC(conn, cdc_sql, encodingScheme);
cdcIndexShouldNotBeUsedForDataTableQueries(conn,
tableName,cdcName);
}
@@ -426,7 +556,7 @@ public class CDCQueryIT extends CDCBaseIT {
long startTS = System.currentTimeMillis();
Map<String, List<Set<ChangeRow>>> allBatches = new
HashMap<>(tenantids.length);
for (String tid: tenantids) {
- allBatches.put(tid, generateMutations(startTS, pkColumns,
dataColumns, 20, 5));
+ allBatches.put(tid, generateMutations(tenantId, startTS,
pkColumns, dataColumns, 20, 5));
applyMutations(COMMIT_SUCCESS, schemaName, tableName,
datatableName, tid,
allBatches.get(tid), cdcName);
}
@@ -489,7 +619,7 @@ public class CDCQueryIT extends CDCBaseIT {
cdcName = generateUniqueName();
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+ createCDC(conn, cdc_sql, encodingScheme);
conn.createStatement().execute("ALTER TABLE " + datatableName + "
DROP COLUMN v0");
}
@@ -514,7 +644,7 @@ public class CDCQueryIT extends CDCBaseIT {
try (Connection conn = newConnection(tenantId)) {
verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
"SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " +
SchemaUtil.getTableName(
- schemaName, cdcName)),
+ schemaName, cdcName) + " ORDER BY
PHOENIX_ROW_TIMESTAMP() ASC"),
datatableName, dataColumns, changes, CHANGE_IMG);
cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName,
cdcName);
}
@@ -540,7 +670,7 @@ public class CDCQueryIT extends CDCBaseIT {
}
cdcName = generateUniqueName();
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+ createCDC(conn, cdc_sql, encodingScheme);
cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName,
cdcName);
}
@@ -603,7 +733,7 @@ public class CDCQueryIT extends CDCBaseIT {
// Create a CDC table
cdcName = generateUniqueName();
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableFullName;
- createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+ createCDC(conn, cdc_sql, encodingScheme);
// Check CDC index is active but empty
String indexTableFullName = SchemaUtil.getTableName(schemaName,
CDCUtil.getCDCIndexName(cdcName));
@@ -677,7 +807,7 @@ public class CDCQueryIT extends CDCBaseIT {
// Create a CDC table
cdcName = generateUniqueName();
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableFullName;
- createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+ createCDC(conn, cdc_sql, encodingScheme);
// Add rows
long startTS = System.currentTimeMillis();
List<ChangeRow> changes = generateChanges(startTS, tenantids,
tableFullName,
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java
index dce37d3356..35e6213293 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java
@@ -134,9 +134,8 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
Bytes.toBytes("v2"));
addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
- List<Mutation> actualIndexMutations =
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
- dataPut,
- null);
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+ .prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, null, null);
// Expect one row of index with row key "v1_k1"
Put idxPut1 = new Put(generateIndexRowKey("v1"));
@@ -166,9 +165,8 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
Bytes.toBytes("v2"));
addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
- List<Mutation> actualIndexMutations =
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
- dataPut,
- null);
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+ .prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, null, null);
// Expect one row of index with row key "_k1", as indexed column C1 is
nullable.
Put idxPut1 = new Put(generateIndexRowKey(null));
@@ -212,9 +210,8 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
2,
Cell.Type.DeleteColumn);
- List<Mutation> actualIndexMutations =
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
- dataPut,
- dataDel);
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+ .prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, dataDel, null);
List<Mutation> expectedIndexMutation = new ArrayList<>();
@@ -278,9 +275,8 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
2,
Cell.Type.DeleteColumn);
- List<Mutation> actualIndexMutations =
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
- dataPut,
- dataDel);
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+ .prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, dataDel, null);
List<Mutation> expectedIndexMutations = new ArrayList<>();
@@ -338,9 +334,8 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
3,
Cell.Type.DeleteFamily);
- List<Mutation> actualIndexMutations =
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
- dataPut,
- dataDel);
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+ .prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, dataDel, null);
List<Mutation> expectedIndexMutations = new ArrayList<>();
@@ -406,9 +401,8 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
1,
Cell.Type.DeleteColumn);
- List<Mutation> actualIndexMutations =
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
- dataPut,
- dataDel);
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+ .prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, dataDel, null);
List<Mutation> expectedIndexMutations = new ArrayList<>();
@@ -458,10 +452,8 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
2,
Cell.Type.DeleteFamily);
- List<Mutation> actualIndexMutations =
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(
- info.indexMaintainer,
- dataPut,
- dataDel);
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+ .prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, dataDel, null);
List<Mutation> expectedIndexMutations = new ArrayList<>();
byte[] idxKeyBytes = generateIndexRowKey("v2");
@@ -522,10 +514,8 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
2,
Cell.Type.DeleteFamily);
- List<Mutation> actualIndexMutations =
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(
- info.indexMaintainer,
- dataPut,
- dataDel);
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+ .prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, dataDel, null);
List<Mutation> expectedIndexMutations = new ArrayList<>();
byte[] idxKeyBytes = generateIndexRowKey("v2");
@@ -579,9 +569,8 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
2,
Cell.Type.DeleteColumn);
- List<Mutation> actualIndexMutations =
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
- dataPut,
- dataDel);
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+ .prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, dataDel, null);
List<Mutation> expectedIndexMutations = new ArrayList<>();
byte[] idxKeyBytes = generateIndexRowKey("v1");
@@ -655,9 +644,8 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
2,
Cell.Type.DeleteColumn);
- List<Mutation> actualIndexMutations =
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
- dataPut,
- dataDel);
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+ .prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, dataDel, null);
List<Mutation> expectedIndexMutation = new ArrayList<>();
@@ -720,9 +708,8 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
Bytes.toBytes("v3"));
addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 2);
- List<Mutation> actualIndexMutations =
IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
- dataPut,
- null);
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
+ .prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, null, null);
byte[] idxKeyBytes = generateIndexRowKey(null);
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
index 2cdfc1a8f8..5873766221 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
@@ -284,7 +284,7 @@ public class VerifySingleIndexRowTest extends
BaseConnectionlessQueryTest {
private void initializeGlobalMockitoSetup() throws IOException {
//setup
- when(indexMaintainer.getIndexRowKey(put)).thenCallRealMethod();
+ when(indexMaintainer.getIndexRowKey(put, null)).thenCallRealMethod();
when(rebuildScanner.prepareIndexMutations(put, delete,
indexKeyToMutationMap, mostRecentIndexRowKeys)).thenCallRealMethod();
when(rebuildScanner.verifySingleIndexRow(ArgumentMatchers.<byte[]>any(),
ArgumentMatchers.<List>any(),ArgumentMatchers.<List>any(),
ArgumentMatchers.<Set>any(), ArgumentMatchers.<List>any(),
ArgumentMatchers.<IndexToolVerificationResult.PhaseResult>any(),
ArgumentMatchers.anyBoolean())).thenCallRealMethod();
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/parse/PartitionIdFunctionTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/parse/PartitionIdFunctionTest.java
new file mode 100644
index 0000000000..9ab88211c9
--- /dev/null
+++
b/phoenix-core/src/test/java/org/apache/phoenix/parse/PartitionIdFunctionTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class PartitionIdFunctionTest {
+
+ @Test
+ public void testExpressionWithPartitionId() throws Exception {
+ ParseNode parseNode = SQLParser.parseCondition("(PARTITION_ID() =
PK2)");
+ boolean hasPartitionIdParseNode = false;
+ for (ParseNode childNode : parseNode.getChildren()) {
+ if
(childNode.getClass().isAssignableFrom(PartitionIdParseNode.class)) {
+ assertEquals(0, childNode.getChildren().size());
+ hasPartitionIdParseNode = true;
+ }
+ }
+ assertTrue(hasPartitionIdParseNode);
+ }
+
+ @Test
+ public void testExpressionWithPhoenixRowTimestampWithParams() throws
Exception {
+ ParseNode parseNode = SQLParser.parseCondition("(PARTITION_ID(COL1) =
PK2)");
+ for (ParseNode childNode : parseNode.getChildren()) {
+ assertFalse("PartitionIdFunction does not take any parameters",
+
childNode.getClass().isAssignableFrom(PartitionIdParseNode.class));
+ }
+ }
+
+ @Test
+ public void testSelectWithPhoenixRowTimestamp() throws Exception {
+ SQLParser parser = new SQLParser("SELECT PARTITION_ID() FROM xyz");
+ List<AliasedNode> nodes = parser.parseQuery().getSelect();
+ assertEquals(1, nodes.size());
+ assertTrue("PARTITION_ID() should parse to PartitionIdParseNode",
+ nodes.get(0).getNode().getClass()
+ .isAssignableFrom(PartitionIdParseNode.class));
+ assertEquals(0, nodes.get(0).getNode().getChildren().size());
+ }
+
+}