This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch PHOENIX-7001-feature in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7001-feature by this push: new 581e61365c PHOENIX-7008 Implementation for CREATE CDC (#1681) 581e61365c is described below commit 581e61365cac22ce85592179703029adcfd9af24 Author: Hari Krishna Dara <harid...@gmail.com> AuthorDate: Sat Sep 30 09:24:52 2023 +0530 PHOENIX-7008 Implementation for CREATE CDC (#1681) --- .gitignore | 3 + .../java/org/apache/phoenix/end2end/CDCMiscIT.java | 152 ++++++++++ .../phoenix/compile/CreateIndexCompiler.java | 2 +- .../phoenix/coprocessor/MetaDataEndpointImpl.java | 17 +- .../phoenix/coprocessor/MetaDataProtocol.java | 2 +- .../coprocessor/PhoenixAccessController.java | 6 +- .../apache/phoenix/exception/SQLExceptionCode.java | 7 +- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 7 + .../org/apache/phoenix/jdbc/PhoenixStatement.java | 15 +- .../phoenix/query/ConnectionQueryServicesImpl.java | 17 +- .../org/apache/phoenix/query/QueryConstants.java | 5 + .../org/apache/phoenix/schema/DelegateTable.java | 6 + .../org/apache/phoenix/schema/MetaDataClient.java | 325 ++++++++++++++------- .../java/org/apache/phoenix/schema/PTable.java | 18 +- .../java/org/apache/phoenix/schema/PTableImpl.java | 24 ++ .../java/org/apache/phoenix/schema/PTableType.java | 3 +- .../org/apache/phoenix/schema/TableProperty.java | 33 +++ .../main/java/org/apache/phoenix/util/CDCUtil.java | 85 ++++++ .../java/org/apache/phoenix/util/SchemaUtil.java | 3 +- phoenix-core/src/main/protobuf/PTable.proto | 2 + .../org/apache/phoenix/parse/QueryParserTest.java | 42 ++- .../java/org/apache/phoenix/util/CDCUtilTest.java | 39 +++ 22 files changed, 671 insertions(+), 142 deletions(-) diff --git a/.gitignore b/.gitignore index 1d33a1cbe0..bebe16f280 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,6 @@ phoenix-hbase-compat-1.3.0/ phoenix-hbase-compat-1.4.0/ phoenix-hbase-compat-1.5.0/ */hbase.log + +# Vim swap files +.*.sw* diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java new file mode 100644 index 0000000000..380445e22a --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableProperty; +import org.apache.phoenix.util.CDCUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category(ParallelStatsDisabledTest.class) +public class CDCMiscIT extends ParallelStatsDisabledIT { + private void assertCDCState(Connection conn, String cdcName, String expInclude, + int idxType) throws SQLException { + try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " + + "system.catalog WHERE table_name = '" + cdcName + + "' AND column_name IS NULL and column_family IS NULL")) { + assertEquals(true, rs.next()); + assertEquals(expInclude, rs.getString(1)); + } + try (ResultSet rs = conn.createStatement().executeQuery("SELECT index_type FROM " + + "system.catalog WHERE table_name = '" + CDCUtil.getCDCIndexName(cdcName) + + "' AND column_name IS NULL and column_family IS NULL")) { + assertEquals(true, rs.next()); + assertEquals(idxType, rs.getInt(1)); + } + } + + private void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes) + throws SQLException { + Properties props = new Properties(); + Connection conn = DriverManager.getConnection(getUrl(), props); + PTable table = PhoenixRuntime.getTable(conn, cdcName); + assertEquals(expIncludeScopes, table.getCDCIncludeScopes()); + assertEquals(expIncludeScopes, TableProperty.INCLUDE.getPTableValue(table)); + } + + @Test + public void testCreate() throws SQLException { + Properties props = new Properties(); + Connection conn = DriverManager.getConnection(getUrl(), props); + String tableName = generateUniqueName(); + conn.createStatement().execute( + "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER," + + " v2 DATE)"); + String cdcName = generateUniqueName(); + + try { + conn.createStatement().execute("CREATE CDC " + cdcName + + " ON NON_EXISTENT_TABLE (PHOENIX_ROW_TIMESTAMP())"); + fail("Expected to fail due to non-existent table"); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode()); + } + + try { + conn.createStatement().execute("CREATE CDC " + cdcName + + " ON " + tableName +"(UNKNOWN_FUNCTION())"); + fail("Expected to fail due to invalid function"); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.FUNCTION_UNDEFINED.getErrorCode(), e.getErrorCode()); + } + + try { + conn.createStatement().execute("CREATE CDC " + cdcName + + " ON " + tableName +"(NOW())"); + fail("Expected to fail due to non-deterministic function"); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX. + getErrorCode(), e.getErrorCode()); + } + + try { + conn.createStatement().execute("CREATE CDC " + cdcName + + " ON " + tableName +"(ROUND(v1))"); + fail("Expected to fail due to non-timestamp expression in the index PK"); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(), + e.getErrorCode()); + } + + try { + conn.createStatement().execute("CREATE CDC " + cdcName + + " ON " + tableName +"(v1)"); + fail("Expected to fail due to non-timestamp column in the index PK"); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(), + e.getErrorCode()); + } + + String cdc_sql = "CREATE CDC " + cdcName + + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())"; + conn.createStatement().execute(cdc_sql); + assertCDCState(conn, cdcName, null, 3); + + try { + conn.createStatement().execute(cdc_sql); + fail("Expected to fail due to duplicate index"); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.TABLE_ALREADY_EXIST.getErrorCode(), e.getErrorCode()); + assertTrue(e.getMessage().endsWith(cdcName)); + } + conn.createStatement().execute("CREATE CDC IF NOT EXISTS " + cdcName + " ON " + tableName + + "(v2) INCLUDE (pre, post) INDEX_TYPE=g"); + + cdcName = generateUniqueName(); + conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName + + "(v2) INCLUDE (pre, post) INDEX_TYPE=g"); + assertCDCState(conn, cdcName, "PRE,POST", 3); + assertPTable(cdcName, new HashSet<>( + Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST))); + + cdcName = generateUniqueName(); + conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName + + "(v2) INDEX_TYPE=l"); + assertCDCState(conn, cdcName, null, 2); + assertPTable(cdcName, null); + + conn.close(); + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java index 8a6e239897..1279f83b25 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java @@ -82,7 +82,7 @@ public class CreateIndexCompiler { return new BaseMutationPlan(context, operation) { @Override public MutationState execute() throws SQLException { - return client.createIndex(create, splits); + return client.createIndex(create, splits, null); } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 7db23f4678..553a925374 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -82,6 +82,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTE import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_BYTES; import static org.apache.phoenix.query.QueryConstants.VIEW_MODIFIED_PROPERTY_TAG_TYPE; import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone; import static org.apache.phoenix.schema.PTableType.INDEX; @@ -246,6 +247,7 @@ import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.CDCUtil; import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.MetaDataUtil; @@ -365,6 +367,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); TABLE_FAMILY_BYTES, EXTERNAL_SCHEMA_ID_BYTES); private static final Cell STREAMING_TOPIC_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STREAMING_TOPIC_NAME_BYTES); + private static final Cell CDC_INCLUDE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, + TABLE_FAMILY_BYTES, CDC_INCLUDE_BYTES); private static final List<Cell> TABLE_KV_COLUMNS = Lists.newArrayList( EMPTY_KEYVALUE_KV, @@ -404,7 +408,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); CHANGE_DETECTION_ENABLED_KV, SCHEMA_VERSION_KV, EXTERNAL_SCHEMA_ID_KV, - STREAMING_TOPIC_NAME_KV + STREAMING_TOPIC_NAME_KV, + CDC_INCLUDE_KV ); static { @@ -452,6 +457,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); TABLE_KV_COLUMNS.indexOf(EXTERNAL_SCHEMA_ID_KV); private static final int STREAMING_TOPIC_NAME_INDEX = TABLE_KV_COLUMNS.indexOf(STREAMING_TOPIC_NAME_KV); + private static final int CDC_INCLUDE_INDEX = TABLE_KV_COLUMNS.indexOf(CDC_INCLUDE_KV); // KeyValues for Column private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES); private static final KeyValue COLUMN_SIZE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES); @@ -1423,6 +1429,15 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); builder.setStreamingTopicName(streamingTopicName != null ? streamingTopicName : oldTable != null ? oldTable.getStreamingTopicName() : null); + Cell includeSpecKv = tableKeyValues[CDC_INCLUDE_INDEX]; + String includeSpec = includeSpecKv != null ? + (String) PVarchar.INSTANCE.toObject(includeSpecKv.getValueArray(), + includeSpecKv.getValueOffset(), includeSpecKv.getValueLength()) + : null; + builder.setCDCIncludeScopes(includeSpec != null ? + CDCUtil.makeChangeScopeEnumsFromString(includeSpec) : + oldTable != null ? oldTable.getCDCIncludeScopes() : null); + // Check the cell tag to see whether the view has modified this property final byte[] tagUseStatsForParallelization = (useStatsForParallelizationKv == null) ? HConstants.EMPTY_BYTE_ARRAY : diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index af6c1cf312..a88a991a04 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -99,7 +99,7 @@ public abstract class MetaDataProtocol extends MetaDataService { public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0 = MIN_TABLE_TIMESTAMP + 29; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 = MIN_TABLE_TIMESTAMP + 33; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_1_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0; - public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 = MIN_TABLE_TIMESTAMP + 37; + public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 = MIN_TABLE_TIMESTAMP + 38; // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java index 74d94c1333..1039d88947 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java @@ -173,7 +173,7 @@ public class PhoenixAccessController extends BaseMetaDataEndpointObserver { Set<byte[]> familySet, Set<TableName> indexes) throws IOException { if (!accessCheckEnabled) { return; } - if (tableType != PTableType.VIEW) { + if (tableType != PTableType.VIEW && tableType != PTableType.CDC) { TableDescriptorBuilder tableDescBuilder = TableDescriptorBuilder.newBuilder(physicalTableName); for (byte[] familyName : familySet) { tableDescBuilder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(familyName).build()); @@ -184,9 +184,9 @@ public class PhoenixAccessController extends BaseMetaDataEndpointObserver { } } - // Index and view require read access on parent physical table. + // Index, view and CDC require read access on parent physical table. Set<TableName> physicalTablesChecked = new HashSet<TableName>(); - if (tableType == PTableType.VIEW || tableType == PTableType.INDEX) { + if (tableType == PTableType.VIEW || tableType == PTableType.INDEX || tableType == PTableType.CDC) { physicalTablesChecked.add(parentPhysicalTableName); if (execPermissionsCheckEnabled) { requireAccess("Create" + tableType, parentPhysicalTableName, Action.READ, Action.EXEC); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 67dd2635a1..2fb221f165 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -168,8 +168,9 @@ public enum SQLExceptionCode { AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX(520, "42897", "Aggregate expression not allowed in an index."), NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX(521, "42898", "Non-deterministic expression not allowed in an index."), STATELESS_EXPRESSION_NOT_ALLOWED_IN_INDEX(522, "42899", "Stateless expression not allowed in an index."), - - /** + INCORRECT_DATATYPE_FOR_EXPRESSION(539, "42102", "Expression datatype is incorrect for this index."), + + /** * Transaction exceptions. */ TRANSACTION_CONFLICT_EXCEPTION(523, "42900", "Transaction aborted due to conflict with other mutations."), @@ -351,6 +352,8 @@ public enum SQLExceptionCode { + PhoenixDatabaseMetaData.PHOENIX_TTL + " property on an view when parent/child view has PHOENIX_TTL set,"), CHANGE_DETECTION_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY(10954, "44A36", CHANGE_DETECTION_ENABLED + " is only supported on tables and views"), + UNKNOWN_INDEX_TYPE(1098,"44A37", "Unknown INDEX type: "), + UNKNOWN_INCLUDE_CHANGE_SCOPE(1099,"44A38", "Unknown change scope for INCLUDE: "), /** Sequence related */ SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 4283a59e00..5a95bf4a52 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -430,6 +430,13 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final String SYSTEM_TRANSFORM_TABLE = "TRANSFORM"; public static final String SYSTEM_TRANSFORM_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_TRANSFORM_TABLE); + public static final String CDC_INCLUDE_NAME = "INCLUDE"; + public static final String CDC_INCLUDE_TABLE = "CDC_INCLUDE"; + public static final byte[] CDC_INCLUDE_BYTES = Bytes.toBytes(CDC_INCLUDE_TABLE); + + // This is just a virtual property on CDC that translates to the type of index created. + public static final String CDC_INDEX_TYPE_NAME = "INDEX_TYPE"; + //SYSTEM:LOG public static final String SYSTEM_LOG_TABLE = "LOG"; public static final String SYSTEM_LOG_NAME = diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 54f99fe437..7589ba5021 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -1082,7 +1082,20 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable @Override public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { - return null; + final StatementContext context = new StatementContext(stmt); + return new BaseMutationPlan(context, this.getOperation()) { + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return new ExplainPlan(Collections.singletonList("CREATE CDC")); + } + + @Override + public MutationState execute() throws SQLException { + MetaDataClient client = new MetaDataClient(getContext().getConnection()); + return client.createCDC(ExecutableCreateCDCStatement.this); + } + }; } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 551be97e1c..5d1273551e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -2106,8 +2106,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement break; } } - if ((tableType == PTableType.VIEW && physicalTableName != null) || - (tableType != PTableType.VIEW && (physicalTableName == null || localIndexTable))) { + if ((tableType != PTableType.CDC) && ( + (tableType == PTableType.VIEW && physicalTableName != null) || + (tableType != PTableType.VIEW && (physicalTableName == null || localIndexTable)) + )) { // For views this will ensure that metadata already exists // For tables and indexes, this will create the metadata if it doesn't already exist ensureTableCreated(physicalTableNameBytes, null, tableType, tableProps, families, splits, true, @@ -4096,19 +4098,22 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0) { metaConnection = addColumnsIfNotExists(metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -3, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 4, PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME + " " + PVarchar.INSTANCE.getSqlTypeName()); metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -2, + MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 3, PhoenixDatabaseMetaData.SCHEMA_VERSION + " " + PVarchar.INSTANCE.getSqlTypeName()); metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -1, + MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 2, PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID + " " + PVarchar.INSTANCE.getSqlTypeName()); metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0, + MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 1, PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME + " " + PVarchar.INSTANCE.getSqlTypeName()); + metaConnection = addColumnsIfNotExists(metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0, + PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE + " " + PVarchar.INSTANCE.getSqlTypeName()); UpgradeUtil.bootstrapLastDDLTimestampForIndexes(metaConnection); } return metaConnection; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index 52326c189d..b3bab6b637 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -47,6 +47,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BASE_COLUMN_COUNT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BIND_PARAMETERS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BUFFER_LENGTH; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CACHE_SIZE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHAR_OCTET_LENGTH; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME; @@ -303,6 +304,9 @@ public interface QueryConstants { // custom TagType byte VIEW_MODIFIED_PROPERTY_TAG_TYPE = (byte) 70; + + String CDC_JSON_COL_NAME = "CDC JSON"; + /** * We mark counter values 0 to 10 as reserved. Value 0 is used by * {@link #ENCODED_EMPTY_COLUMN_NAME}. Values 1-10 @@ -342,6 +346,7 @@ public interface QueryConstants { SCHEMA_VERSION + " VARCHAR, \n" + EXTERNAL_SCHEMA_ID + " VARCHAR, \n" + STREAMING_TOPIC_NAME + " VARCHAR, \n" + + CDC_INCLUDE_TABLE + " VARCHAR, \n" + // Column metadata (will be null for table row) DATA_TYPE + " INTEGER," + COLUMN_SIZE + " INTEGER," + diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java index b79691b335..54a959f177 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java @@ -19,6 +19,7 @@ package org.apache.phoenix.schema; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; @@ -397,6 +398,11 @@ public class DelegateTable implements PTable { @Override public String getStreamingTopicName() { return delegate.getStreamingTopicName(); } + @Override + public Set<CDCChangeScope> getCDCIncludeScopes() { + return delegate.getCDCIncludeScopes(); + } + @Override public Map<String, String> getPropertyValues() { return delegate.getPropertyValues(); } @Override public Map<String, String> getDefaultPropertyValues() { return delegate.getDefaultPropertyValues(); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 4fbee3cc24..ca30b9a215 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -19,6 +19,8 @@ package org.apache.phoenix.schema; import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TRANSACTIONAL_TABLE; import static org.apache.phoenix.exception.SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY; +import static org.apache.phoenix.exception.SQLExceptionCode.TABLE_ALREADY_EXIST; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE; import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME; @@ -44,7 +46,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_ENCODED_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME; @@ -64,7 +65,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ; @@ -121,7 +121,6 @@ import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_P import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS; import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; import static org.apache.phoenix.schema.PTable.ViewType.MAPPED; -import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone; import static org.apache.phoenix.schema.PTableType.INDEX; import static org.apache.phoenix.schema.PTableType.TABLE; import static org.apache.phoenix.schema.PTableType.VIEW; @@ -139,6 +138,7 @@ import java.sql.SQLFeatureNotSupportedException; import java.sql.Timestamp; import java.sql.Types; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; import java.util.Collection; import java.util.Collections; @@ -155,6 +155,7 @@ import java.util.Set; import java.util.HashSet; import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.parse.CreateCDCStatement; import org.apache.phoenix.schema.task.SystemTaskParams; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.HConstants; @@ -190,6 +191,8 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; import org.apache.phoenix.coprocessor.MetaDataProtocol.SharedTableState; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.transform.Transform; +import org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap; +import org.apache.phoenix.util.CDCUtil; import org.apache.phoenix.util.TaskMetaDataServiceCallBack; import org.apache.phoenix.util.ViewUtil; import org.apache.phoenix.util.JacksonUtil; @@ -290,7 +293,6 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; import org.apache.phoenix.thirdparty.com.google.common.collect.Sets; import org.apache.phoenix.thirdparty.com.google.common.primitives.Ints; - public class MetaDataClient { private static final Logger LOGGER = LoggerFactory.getLogger(MetaDataClient.class); @@ -347,9 +349,10 @@ public class MetaDataClient { CHANGE_DETECTION_ENABLED + "," + PHYSICAL_TABLE_NAME + "," + SCHEMA_VERSION + "," + - STREAMING_TOPIC_NAME + + STREAMING_TOPIC_NAME + "," + + CDC_INCLUDE_TABLE + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " + - "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; private static final String CREATE_SCHEMA = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " + TABLE_SCHEM + "," + TABLE_NAME + ") VALUES (?,?)"; @@ -1019,7 +1022,9 @@ public class MetaDataClient { true, NamedTableNode.create(statement.getTableName()), statement.getTableType(), false, null); } } - table = createTableInternal(statement, splits, parent, viewStatement, viewType, viewIndexIdType, viewColumnConstants, isViewColumnReferenced, false, null, null, tableProps, commonFamilyProps); + table = createTableInternal(statement, splits, parent, viewStatement, viewType, + viewIndexIdType, viewColumnConstants, isViewColumnReferenced, false, null, + null, null, tableProps, commonFamilyProps); if (table == null || table.getType() == PTableType.VIEW || statement.isNoVerify() /*|| table.isTransactional()*/) { @@ -1177,9 +1182,10 @@ public class MetaDataClient { + " IS NULL AND " + LAST_STATS_UPDATE_TIME + " IS NOT NULL"; try (PreparedStatement selectStatsStmt = connection.prepareStatement(query)) { selectStatsStmt.setString(1, physicalName.getString()); - ResultSet rs = selectStatsStmt.executeQuery(query); - if (rs.next()) { - msSinceLastUpdate = rs.getLong(1) - rs.getLong(2); + try (ResultSet rs = selectStatsStmt.executeQuery(query)) { + if (rs.next()) { + msSinceLastUpdate = rs.getLong(1) - rs.getLong(2); + } } } } @@ -1417,10 +1423,11 @@ public class MetaDataClient { * listed as an index column. * @param statement * @param splits + * @param indexPKExpresionsType If non-{@code null}, all PK expressions should be of this specific type. * @return MutationState from population of index table from data table * @throws SQLException */ - public MutationState createIndex(CreateIndexStatement statement, byte[][] splits) throws SQLException { + public MutationState createIndex(CreateIndexStatement statement, byte[][] splits, PDataType indexPKExpresionsType) throws SQLException { IndexKeyConstraint ik = statement.getIndexConstraint(); TableName indexTableName = statement.getIndexTableName(); @@ -1535,6 +1542,9 @@ public class MetaDataClient { if (expression.isStateless()) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.STATELESS_EXPRESSION_NOT_ALLOWED_IN_INDEX).build().buildException(); } + if (indexPKExpresionsType != null && expression.getDataType() != indexPKExpresionsType) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION).build().buildException(); + } unusedPkColumns.remove(expression); // Go through parse node to get string as otherwise we @@ -1666,7 +1676,9 @@ public class MetaDataClient { tableProps.put(MetaDataUtil.DATA_TABLE_NAME_PROP_NAME, dataTable.getPhysicalName().getString()); CreateTableStatement tableStatement = FACTORY.createTable(indexTableName, statement.getProps(), columnDefs, pk, statement.getSplitNodes(), PTableType.INDEX, statement.ifNotExists(), null, null, statement.getBindCount(), null); - table = createTableInternal(tableStatement, splits, dataTable, null, null, getViewIndexDataType() ,null, null, allocateIndexId, statement.getIndexType(), asyncCreatedDate, tableProps, commonFamilyProps); + table = createTableInternal(tableStatement, splits, dataTable, null, null, + getViewIndexDataType() ,null, null, allocateIndexId, + statement.getIndexType(), asyncCreatedDate, null, tableProps, commonFamilyProps); } finally { deleteMutexCells(physicalSchemaName, physicalTableName, acquiredColumnMutexSet); @@ -1698,6 +1710,83 @@ public class MetaDataClient { return buildIndex(table, tableRef); } + public MutationState createCDC(CreateCDCStatement statement) throws SQLException { + Map<String,Object> tableProps = Maps.newHashMapWithExpectedSize( + statement.getProps().size()); + Map<String,Object> commonFamilyProps = Maps.newHashMapWithExpectedSize( + statement.getProps().size() + 1); + populatePropertyMaps(statement.getProps(), tableProps, commonFamilyProps, PTableType.CDC); + + NamedNode indexName = FACTORY.indexName(CDCUtil.getCDCIndexName( + statement.getCdcObjName().getName())); + String timeIdxColName = statement.getTimeIdxColumn() != null ? + statement.getTimeIdxColumn().getColumnName() : null; + IndexKeyConstraint indexKeyConstraint = + FACTORY.indexKey(Arrays.asList(new Pair[] { Pair.newPair( + timeIdxColName != null ? FACTORY.column(statement.getDataTable(), + timeIdxColName, timeIdxColName) : statement.getTimeIdxFunc(), + SortOrder.getDefault()) })); + IndexType indexType = (IndexType) TableProperty.INDEX_TYPE.getValue(tableProps); + ListMultimap<String, Pair<String, Object>> indexProps = ArrayListMultimap.create(); + // TODO: Transfer TTL and MaxLookback from statement.getProps() to indexProps. + CreateIndexStatement indexStatement = FACTORY.createIndex(indexName, FACTORY.namedTable(null, + statement.getDataTable(), (Double) null), indexKeyConstraint, null, null, + indexProps, statement.isIfNotExists(), indexType, false, 0, + new HashMap<>()); + // TODO: Currently index can be dropped, leaving the CDC dangling, DROP INDEX needs to + // protect based on CDCUtil.isACDCIndex(). + // TODO: Should we also allow PTimestamp here? + MutationState indexMutationState; + try { + indexMutationState = createIndex(indexStatement, null, PDate.INSTANCE); + } + catch(SQLException e) { + if (e.getErrorCode() == TABLE_ALREADY_EXIST.getErrorCode()) { + throw new SQLExceptionInfo.Builder(TABLE_ALREADY_EXIST).setTableName( + statement.getCdcObjName().getName()).build().buildException(); + } + // TODO: What about translating other index creation failures? E.g., bad TS column. + throw e; + } + + // TODO: Do we need to borrow the schema name of the table? + ColumnResolver resolver = FromCompiler.getResolver(NamedTableNode.create(statement.getDataTable()), connection); + TableRef tableRef = resolver.getTables().get(0); + PTable dataTable = tableRef.getTable(); + List<PColumn> pkColumns = dataTable.getPKColumns(); + List<ColumnDef> columnDefs = new ArrayList<>(); + List<ColumnDefInPkConstraint> pkColumnDefs = new ArrayList<>(); + // TODO: toString() on function will have an extra space at the beginning, but this may + // be OK as I see exactly the same with an index. + ColumnName timeIdxCol = statement.getTimeIdxColumn() != null ? + statement.getTimeIdxColumn() : + FACTORY.columnName(statement.getTimeIdxFunc().toString()); + columnDefs.add(FACTORY.columnDef(timeIdxCol, PTimestamp.INSTANCE.getSqlTypeName(), false, null, false, + PTimestamp.INSTANCE.getMaxLength(null), PTimestamp.INSTANCE.getScale(null), false, + SortOrder.getDefault(), "", null, false)); + pkColumnDefs.add(FACTORY.columnDefInPkConstraint(timeIdxCol, SortOrder.getDefault(), false)); + for (PColumn pcol: pkColumns) { + // TODO: Cross check with the ColumnName creation logic in createIndex (line ~1578). + columnDefs.add(FACTORY.columnDef(FACTORY.columnName(pcol.getName().getString()), + pcol.getDataType().getSqlTypeName(), false, null, false, pcol.getMaxLength(), + pcol.getScale(), false, pcol.getSortOrder(), "", null, false)); + pkColumnDefs.add(FACTORY.columnDefInPkConstraint(FACTORY.columnName( + pcol.getName().getString()), pcol.getSortOrder(), pcol.isRowTimestamp())); + } + columnDefs.add(FACTORY.columnDef(FACTORY.columnName(QueryConstants.CDC_JSON_COL_NAME), + PVarchar.INSTANCE.getSqlTypeName(), false, null, true, null, + null, false, SortOrder.getDefault(), "", null, false)); + CreateTableStatement tableStatement = FACTORY.createTable( + FACTORY.table(dataTable.getSchemaName().getString(), statement.getCdcObjName().getName()), + statement.getProps(), columnDefs, FACTORY.primaryKey(null, pkColumnDefs), + Collections.emptyList(), PTableType.CDC, statement.isIfNotExists(), null, null, + statement.getBindCount(), null); + createTableInternal(tableStatement, null, dataTable, null, null, null, + null, null, false, null, + null, statement.getIncludeScopes(), tableProps, commonFamilyProps); + return indexMutationState; + } + public MutationState dropSequence(DropSequenceStatement statement) throws SQLException { Long scn = connection.getSCN(); long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; @@ -1933,11 +2022,12 @@ public class MetaDataClient { } private PTable createTableInternal(CreateTableStatement statement, byte[][] splits, - final PTable parent, String viewStatement, ViewType viewType, PDataType viewIndexIdType, - final byte[][] viewColumnConstants, final BitSet isViewColumnReferenced, boolean allocateIndexId, - IndexType indexType, Date asyncCreatedDate, - Map<String,Object> tableProps, - Map<String,Object> commonFamilyProps) throws SQLException { + final PTable parent, String viewStatement, ViewType viewType, PDataType viewIndexIdType, + final byte[][] viewColumnConstants, final BitSet isViewColumnReferenced, boolean allocateIndexId, + IndexType indexType, Date asyncCreatedDate, + Set<PTable.CDCChangeScope> cdcIncludeScopes, + Map<String, Object> tableProps, + Map<String, Object> commonFamilyProps) throws SQLException { final PTableType tableType = statement.getTableType(); boolean wasAutoCommit = connection.getAutoCommit(); TableName tableNameNode = null; @@ -2036,6 +2126,8 @@ public class MetaDataClient { String schemaVersion = (String) TableProperty.SCHEMA_VERSION.getValue(tableProps); String streamingTopicName = (String) TableProperty.STREAMING_TOPIC_NAME.getValue(tableProps); + String cdcIncludeScopesStr = cdcIncludeScopes == null ? null : + CDCUtil.makeChangeScopeStringFromEnums(cdcIncludeScopes); if (parent != null && tableType == PTableType.INDEX) { timestamp = TransactionUtil.getTableTimestamp(connection, transactionProvider != null, transactionProvider); @@ -2075,37 +2167,40 @@ public class MetaDataClient { storeNulls = parent.getStoreNulls(); parentTableName = parent.getTableName().getString(); // Pass through data table sequence number so we can check it hasn't changed - PreparedStatement incrementStatement = connection.prepareStatement(INCREMENT_SEQ_NUM); - incrementStatement.setString(1, tenantIdStr); - incrementStatement.setString(2, schemaName); - incrementStatement.setString(3, parentTableName); - incrementStatement.setLong(4, parent.getSequenceNumber()); - incrementStatement.execute(); - // Get list of mutations and add to table meta data that will be passed to server - // to guarantee order. This row will always end up last - tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond()); - connection.rollback(); + try (PreparedStatement incrementStatement = connection.prepareStatement(INCREMENT_SEQ_NUM)) { + incrementStatement.setString(1, tenantIdStr); + incrementStatement.setString(2, schemaName); + incrementStatement.setString(3, parentTableName); + incrementStatement.setLong(4, parent.getSequenceNumber()); + incrementStatement.execute(); + // Get list of mutations and add to table meta data that will be passed to server + // to guarantee order. This row will always end up last + tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond()); + connection.rollback(); + } // Add row linking from data table row to index table row - PreparedStatement linkStatement = connection.prepareStatement(CREATE_LINK); - linkStatement.setString(1, tenantIdStr); - linkStatement.setString(2, schemaName); - linkStatement.setString(3, parentTableName); - linkStatement.setString(4, tableName); - linkStatement.setByte(5, LinkType.INDEX_TABLE.getSerializedValue()); - linkStatement.setLong(6, parent.getSequenceNumber()); - linkStatement.setString(7, PTableType.INDEX.getSerializedValue()); - linkStatement.execute(); + try (PreparedStatement linkStatement = connection.prepareStatement(CREATE_LINK)) { + linkStatement.setString(1, tenantIdStr); + linkStatement.setString(2, schemaName); + linkStatement.setString(3, parentTableName); + linkStatement.setString(4, tableName); + linkStatement.setByte(5, LinkType.INDEX_TABLE.getSerializedValue()); + linkStatement.setLong(6, parent.getSequenceNumber()); + linkStatement.setString(7, PTableType.INDEX.getSerializedValue()); + linkStatement.execute(); + } // Add row linking index table to parent table for indexes on views if (parent.getType() == PTableType.VIEW) { - linkStatement = connection.prepareStatement(CREATE_VIEW_INDEX_PARENT_LINK); - linkStatement.setString(1, tenantIdStr); - linkStatement.setString(2, schemaName); - linkStatement.setString(3, tableName); - linkStatement.setString(4, parent.getName().getString()); - linkStatement.setByte(5, LinkType.VIEW_INDEX_PARENT_TABLE.getSerializedValue()); - linkStatement.execute(); + try (PreparedStatement linkStatement = connection.prepareStatement(CREATE_VIEW_INDEX_PARENT_LINK)) { + linkStatement.setString(1, tenantIdStr); + linkStatement.setString(2, schemaName); + linkStatement.setString(3, tableName); + linkStatement.setString(4, parent.getName().getString()); + linkStatement.setByte(5, LinkType.VIEW_INDEX_PARENT_TABLE.getSerializedValue()); + linkStatement.execute(); + } } } @@ -2375,24 +2470,26 @@ public class MetaDataClient { pkColumns = newLinkedHashSet(parent.getPKColumns()); // Add row linking view to its parent - PreparedStatement linkStatement = connection.prepareStatement(CREATE_VIEW_LINK); - linkStatement.setString(1, tenantIdStr); - linkStatement.setString(2, schemaName); - linkStatement.setString(3, tableName); - linkStatement.setString(4, parent.getName().getString()); - linkStatement.setByte(5, LinkType.PARENT_TABLE.getSerializedValue()); - linkStatement.setString(6, parent.getTenantId() == null ? null : parent.getTenantId().getString()); - linkStatement.execute(); + try (PreparedStatement linkStatement = connection.prepareStatement(CREATE_VIEW_LINK)) { + linkStatement.setString(1, tenantIdStr); + linkStatement.setString(2, schemaName); + linkStatement.setString(3, tableName); + linkStatement.setString(4, parent.getName().getString()); + linkStatement.setByte(5, LinkType.PARENT_TABLE.getSerializedValue()); + linkStatement.setString(6, parent.getTenantId() == null ? null : parent.getTenantId().getString()); + linkStatement.execute(); + } // Add row linking parent to view - // TODO From 4.16 write the child links to SYSTEM.CHILD_LINK directly - linkStatement = connection.prepareStatement(CREATE_CHILD_LINK); - linkStatement.setString(1, parent.getTenantId() == null ? null : parent.getTenantId().getString()); - linkStatement.setString(2, parent.getSchemaName() == null ? null : parent.getSchemaName().getString()); - linkStatement.setString(3, parent.getTableName().getString()); - linkStatement.setString(4, tenantIdStr); - linkStatement.setString(5, SchemaUtil.getTableName(schemaName, tableName)); - linkStatement.setByte(6, LinkType.CHILD_TABLE.getSerializedValue()); - linkStatement.execute(); + // TODO From 4.16 write the child links to SYSTEM.CHILD_LINK directly + try (PreparedStatement linkStatement = connection.prepareStatement(CREATE_CHILD_LINK)) { + linkStatement.setString(1, parent.getTenantId() == null ? null : parent.getTenantId().getString()); + linkStatement.setString(2, parent.getSchemaName() == null ? null : parent.getSchemaName().getString()); + linkStatement.setString(3, parent.getTableName().getString()); + linkStatement.setString(4, tenantIdStr); + linkStatement.setString(5, SchemaUtil.getTableName(schemaName, tableName)); + linkStatement.setByte(6, LinkType.CHILD_TABLE.getSerializedValue()); + linkStatement.execute(); + } } } else { columns = new LinkedHashMap<PColumn,PColumn>(colDefs.size()); @@ -2409,29 +2506,30 @@ public class MetaDataClient { && !physicalNames.get(0).getString().equals(SchemaUtil.getPhysicalHBaseTableName( schemaName, tableName, isNamespaceMapped).getString()))) { // Add row linking from data table row to physical table row - PreparedStatement linkStatement = connection.prepareStatement(CREATE_LINK); - for (PName physicalName : physicalNames) { - linkStatement.setString(1, tenantIdStr); - linkStatement.setString(2, schemaName); - linkStatement.setString(3, tableName); - linkStatement.setString(4, physicalName.getString()); - linkStatement.setByte(5, LinkType.PHYSICAL_TABLE.getSerializedValue()); - if (tableType == PTableType.VIEW) { - if (parent.getType() == PTableType.TABLE) { - linkStatement.setString(4, SchemaUtil.getTableName(parent.getSchemaName().getString(),parent.getTableName().getString())); + try (PreparedStatement linkStatement = connection.prepareStatement(CREATE_LINK)) { + for (PName physicalName : physicalNames) { + linkStatement.setString(1, tenantIdStr); + linkStatement.setString(2, schemaName); + linkStatement.setString(3, tableName); + linkStatement.setString(4, physicalName.getString()); + linkStatement.setByte(5, LinkType.PHYSICAL_TABLE.getSerializedValue()); + if (tableType == PTableType.VIEW) { + if (parent.getType() == PTableType.TABLE) { + linkStatement.setString(4, SchemaUtil.getTableName(parent.getSchemaName().getString(), parent.getTableName().getString())); + linkStatement.setLong(6, parent.getSequenceNumber()); + } else { //This is a grandchild view, find the physical base table + PTable logicalTable = connection.getTable(new PTableKey(null, SchemaUtil.replaceNamespaceSeparator(physicalName))); + linkStatement.setString(4, SchemaUtil.getTableName(logicalTable.getSchemaName().getString(), logicalTable.getTableName().getString())); + linkStatement.setLong(6, logicalTable.getSequenceNumber()); + } + // Set link to logical name + linkStatement.setString(7, null); + } else { linkStatement.setLong(6, parent.getSequenceNumber()); - } else { //This is a grandchild view, find the physical base table - PTable logicalTable = connection.getTable(new PTableKey(null, SchemaUtil.replaceNamespaceSeparator(physicalName))); - linkStatement.setString(4, SchemaUtil.getTableName(logicalTable.getSchemaName().getString(),logicalTable.getTableName().getString())); - linkStatement.setLong(6, logicalTable.getSequenceNumber()); + linkStatement.setString(7, PTableType.INDEX.getSerializedValue()); } - // Set link to logical name - linkStatement.setString(7, null); - } else { - linkStatement.setLong(6, parent.getSequenceNumber()); - linkStatement.setString(7, PTableType.INDEX.getSerializedValue()); + linkStatement.execute(); } - linkStatement.execute(); } tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond()); connection.rollback(); @@ -2845,12 +2943,13 @@ public class MetaDataClient { } } if (tableType == VIEW && !changedCqCounters.isEmpty()) { - PreparedStatement incrementStatement = connection.prepareStatement(INCREMENT_SEQ_NUM); - incrementStatement.setString(1, null); - incrementStatement.setString(2, viewPhysicalTable.getSchemaName().getString()); - incrementStatement.setString(3, viewPhysicalTable.getTableName().getString()); - incrementStatement.setLong(4, viewPhysicalTable.getSequenceNumber() + 1); - incrementStatement.execute(); + try (PreparedStatement incrementStatement = connection.prepareStatement(INCREMENT_SEQ_NUM)) { + incrementStatement.setString(1, null); + incrementStatement.setString(2, viewPhysicalTable.getSchemaName().getString()); + incrementStatement.setString(3, viewPhysicalTable.getTableName().getString()); + incrementStatement.setLong(4, viewPhysicalTable.getSequenceNumber() + 1); + incrementStatement.execute(); + } } if (connection.getMutationState().toMutations(timestamp).hasNext()) { tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond()); @@ -3072,23 +3171,31 @@ public class MetaDataClient { tableUpsert.setString(35, streamingTopicName); } + if (cdcIncludeScopesStr == null) { + tableUpsert.setNull(36, Types.VARCHAR); + } else { + tableUpsert.setString(36, cdcIncludeScopesStr); + } + tableUpsert.execute(); if (asyncCreatedDate != null) { - PreparedStatement setAsync = connection.prepareStatement(SET_ASYNC_CREATED_DATE); - setAsync.setString(1, tenantIdStr); - setAsync.setString(2, schemaName); - setAsync.setString(3, tableName); - setAsync.setDate(4, asyncCreatedDate); - setAsync.execute(); + try (PreparedStatement setAsync = connection.prepareStatement(SET_ASYNC_CREATED_DATE)) { + setAsync.setString(1, tenantIdStr); + setAsync.setString(2, schemaName); + setAsync.setString(3, tableName); + setAsync.setDate(4, asyncCreatedDate); + setAsync.execute(); + } } else { Date syncCreatedDate = new Date(EnvironmentEdgeManager.currentTimeMillis()); - PreparedStatement setSync = connection.prepareStatement(SET_INDEX_SYNC_CREATED_DATE); - setSync.setString(1, tenantIdStr); - setSync.setString(2, schemaName); - setSync.setString(3, tableName); - setSync.setDate(4, syncCreatedDate); - setSync.execute(); + try (PreparedStatement setSync = connection.prepareStatement(SET_INDEX_SYNC_CREATED_DATE)) { + setSync.setString(1, tenantIdStr); + setSync.setString(2, schemaName); + setSync.setString(3, tableName); + setSync.setDate(4, syncCreatedDate); + setSync.execute(); + } } tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond()); connection.rollback(); @@ -3219,6 +3326,7 @@ public class MetaDataClient { .setExternalSchemaId(result.getTable() != null ? result.getTable().getExternalSchemaId() : null) .setStreamingTopicName(streamingTopicName) + .setCDCIncludeScopes(cdcIncludeScopes) .build(); result = new MetaDataMutationResult(code, result.getMutationTime(), table, true); addTableToCache(result); @@ -3700,8 +3808,8 @@ public class MetaDataClient { // Ordinal position is 1-based and we don't count SALT column in ordinal position int totalColumnCount = table.getColumns().size() + (table.getBucketNum() == null ? 0 : -1); final long seqNum = table.getSequenceNumber() + 1; - PreparedStatement tableUpsert = connection.prepareStatement(MUTATE_TABLE); String tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getString(); + PreparedStatement tableUpsert = connection.prepareStatement(MUTATE_TABLE); try { tableUpsert.setString(1, tenantId); tableUpsert.setString(2, schemaName); @@ -4190,9 +4298,8 @@ public class MetaDataClient { // Add column metadata afterwards, maintaining the order so columns have more predictable ordinal position tableMetaData.addAll(columnMetaData); if (!changedCqCounters.isEmpty()) { - PreparedStatement linkStatement; - linkStatement = connection.prepareStatement(UPDATE_ENCODED_COLUMN_COUNTER); - for (Entry<String, Integer> entry : changedCqCounters.entrySet()) { + try (PreparedStatement linkStatement = connection.prepareStatement(UPDATE_ENCODED_COLUMN_COUNTER)) { + for (Entry<String, Integer> entry : changedCqCounters.entrySet()) { linkStatement.setString(1, tenantIdToUse); linkStatement.setString(2, tableForCQCounters.getSchemaName().getString()); linkStatement.setString(3, tableForCQCounters.getTableName().getString()); @@ -4200,16 +4307,18 @@ public class MetaDataClient { linkStatement.setInt(5, entry.getValue()); linkStatement.execute(); } + } // When a view adds its own columns, then we need to increase the sequence number of the base table // too since we want clients to get the latest PTable of the base table. if (tableType == VIEW) { - PreparedStatement incrementStatement = connection.prepareStatement(INCREMENT_SEQ_NUM); - incrementStatement.setString(1, null); - incrementStatement.setString(2, tableForCQCounters.getSchemaName().getString()); - incrementStatement.setString(3, tableForCQCounters.getTableName().getString()); - incrementStatement.setLong(4, tableForCQCounters.getSequenceNumber() + 1); - incrementStatement.execute(); + try (PreparedStatement incrementStatement = connection.prepareStatement(INCREMENT_SEQ_NUM)) { + incrementStatement.setString(1, null); + incrementStatement.setString(2, tableForCQCounters.getSchemaName().getString()); + incrementStatement.setString(3, tableForCQCounters.getTableName().getString()); + incrementStatement.setLong(4, tableForCQCounters.getSequenceNumber() + 1); + incrementStatement.execute(); + } } tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond()); connection.rollback(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java index 890ce22909..fcaaea5301 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import javax.annotation.Nullable; @@ -962,6 +963,11 @@ public interface PTable extends PMetaDataEntity { */ String getStreamingTopicName(); + /** + * @return Optional string that represents the default include scopes to be used for CDC queries. + */ + Set<CDCChangeScope> getCDCIncludeScopes(); + /** * Class to help track encoded column qualifier counters per column family. */ @@ -1049,6 +1055,11 @@ public interface PTable extends PMetaDataEntity { } enum CDCChangeScope { + /** + * Include only the actual change in image. + */ + CHANGE, + /** * Include only the pre image (state prior to the change) of the row. */ @@ -1063,11 +1074,6 @@ public interface PTable extends PMetaDataEntity { * Include only the latest image of the row. */ LATEST, - - /** - * Include all images. - */ - ALL, + ; } - } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index d332a95269..1e32c60900 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -64,6 +64,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import javax.annotation.Nonnull; @@ -107,6 +108,7 @@ import org.apache.phoenix.schema.types.PFloat; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.CDCUtil; import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; @@ -211,6 +213,7 @@ public class PTableImpl implements PTable { private String schemaVersion; private String externalSchemaId; private String streamingTopicName; + private Set<CDCChangeScope> cdcIncludeScopes; public static class Builder { private PTableKey key; @@ -276,6 +279,7 @@ public class PTableImpl implements PTable { private String schemaVersion; private String externalSchemaId; private String streamingTopicName; + private Set<CDCChangeScope> cdcIncludeScopes; // Used to denote which properties a view has explicitly modified private BitSet viewModifiedPropSet = new BitSet(3); @@ -696,6 +700,13 @@ public class PTableImpl implements PTable { return this; } + public Builder setCDCIncludeScopes(Set<CDCChangeScope> cdcIncludeScopes) { + if (cdcIncludeScopes != null) { + this.cdcIncludeScopes = cdcIncludeScopes; + } + return this; + } + /** * Populate derivable attributes of the PTable * @return PTableImpl.Builder object @@ -986,6 +997,7 @@ public class PTableImpl implements PTable { this.schemaVersion = builder.schemaVersion; this.externalSchemaId = builder.externalSchemaId; this.streamingTopicName = builder.streamingTopicName; + this.cdcIncludeScopes = builder.cdcIncludeScopes; } // When cloning table, ignore the salt column as it will be added back in the constructor @@ -1999,6 +2011,11 @@ public class PTableImpl implements PTable { streamingTopicName = (String) PVarchar.INSTANCE.toObject(table.getStreamingTopicName().toByteArray()); } + String cdcIncludeScopesStr = null; + if (table.hasCDCIncludeScopes()) { + cdcIncludeScopesStr = + (String) PVarchar.INSTANCE.toObject(table.getCDCIncludeScopes().toByteArray()); + } try { return new PTableImpl.Builder() .setType(tableType) @@ -2056,6 +2073,8 @@ public class PTableImpl implements PTable { .setSchemaVersion(schemaVersion) .setExternalSchemaId(externalSchemaId) .setStreamingTopicName(streamingTopicName) + .setCDCIncludeScopes( + CDCUtil.makeChangeScopeEnumsFromString(cdcIncludeScopesStr)) .build(); } catch (SQLException e) { throw new RuntimeException(e); // Impossible @@ -2337,6 +2356,11 @@ public class PTableImpl implements PTable { return streamingTopicName; } + @Override + public Set<CDCChangeScope> getCDCIncludeScopes() { + return cdcIncludeScopes; + } + private static final class KVColumnFamilyQualifier { @Nonnull private final String colFamilyName; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableType.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableType.java index 8d6281edd4..d89f1bd36a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableType.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableType.java @@ -29,7 +29,8 @@ public enum PTableType { INDEX("i", "INDEX"), PROJECTED("p", "PROJECTED"), CDC("c", "CDC"), - SUBQUERY("q", "SUBQUERY"); + SUBQUERY("q", "SUBQUERY"), + ; private final PName value; private final String serializedValue; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java index 24d0432543..40bef055a7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java @@ -25,6 +25,7 @@ import static org.apache.phoenix.exception.SQLExceptionCode.SALT_ONLY_ON_CREATE_ import static org.apache.phoenix.exception.SQLExceptionCode.VIEW_WITH_PROPERTIES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED; +import static org.apache.phoenix.util.CDCUtil.CDC_INDEX_TYPE_LOCAL; import java.sql.SQLException; import java.util.Map; @@ -336,6 +337,38 @@ public enum TableProperty { @Override public Object getPTableValue(PTable table) { return table.getStreamingTopicName(); } + }, + + INDEX_TYPE(PhoenixDatabaseMetaData.CDC_INDEX_TYPE_NAME, COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY, false, false, false) { + @Override + public Object getValue(Object value) { + return value != null && value.toString().toUpperCase().equals(CDC_INDEX_TYPE_LOCAL) ? + PTable.IndexType.LOCAL : + PTable.IndexType.UNCOVERED_GLOBAL; + } + + @Override + public Object getPTableValue(PTable table) { + return null; + } + }, + + INCLUDE(PhoenixDatabaseMetaData.CDC_INCLUDE_NAME, COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY, true, false, false) { + @Override + public Object getValue(Object value) { + try { + return value == null ? PTable.CDCChangeScope.CHANGE : PTable.CDCChangeScope.valueOf(value.toString().toUpperCase()); + } catch (IllegalArgumentException e) { + throw new RuntimeException(new SQLExceptionInfo.Builder(SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE) + .setMessage(value.toString()) + .build().buildException()); + } + } + + @Override + public Object getPTableValue(PTable table) { + return table.getCDCIncludeScopes(); + } }; private final String propertyName; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java new file mode 100644 index 0000000000..d015aaf422 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.util; + +import java.util.HashSet; +import java.util.Set; +import java.util.StringTokenizer; + +import org.apache.hadoop.util.StringUtils; + +import org.apache.phoenix.schema.PTable; + +public class CDCUtil { + public static final String CDC_INDEX_PREFIX = "__CDC__"; + public static final String CDC_INDEX_TYPE_LOCAL = "L"; + + /** + * Make a set of CDC change scope enums from the given string containing comma separated scope + * names. + * + * @param includeScopes Comma-separated scope names. + * @return the set of enums, which can be empty if the string is empty or has no valid names. + */ + public static Set<PTable.CDCChangeScope> makeChangeScopeEnumsFromString(String includeScopes) { + Set<PTable.CDCChangeScope> cdcChangeScopes = new HashSet<>(); + if (includeScopes != null) { + StringTokenizer st = new StringTokenizer(includeScopes, ","); + while (st.hasMoreTokens()) { + String tok = st.nextToken(); + try { + cdcChangeScopes.add(PTable.CDCChangeScope.valueOf(tok.trim().toUpperCase())); + } + catch (IllegalArgumentException e) { + // Just ignore unrecognized scopes. + } + } + } + return cdcChangeScopes; + } + + /** + * Make a string of comma-separated scope names from the specified set of enums. + * + * @param includeScopes Set of scope enums + * @return the comma-separated string of scopes, which can be an empty string in case the set is empty. + */ + public static String makeChangeScopeStringFromEnums(Set<PTable.CDCChangeScope> includeScopes) { + String cdcChangeScopes = null; + if (includeScopes != null) { + Iterable<String> tmpStream = () -> includeScopes.stream().sorted() + .map(s -> s.name()).iterator(); + cdcChangeScopes = StringUtils.join(",", tmpStream); + } + return cdcChangeScopes; + } + + public static String getCDCIndexName(String cdcName) { + return CDC_INDEX_PREFIX + cdcName; + } + + public static String getCDCNameFromIndexName(String indexName) { + assert(indexName.startsWith(CDC_INDEX_PREFIX)); + return indexName.substring(CDC_INDEX_PREFIX.length()); + } + + public static boolean isACDCIndex(String indexName) { + return indexName.startsWith(CDC_INDEX_PREFIX); + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java index fbca85b3b9..14a48a62e8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java @@ -115,6 +115,7 @@ public class SchemaUtil { private static final int VAR_KV_LENGTH_ESTIMATE = 50; public static final String ESCAPE_CHARACTER = "\""; public static final DataBlockEncoding DEFAULT_DATA_BLOCK_ENCODING = DataBlockEncoding.FAST_DIFF; + public static final PDatum VAR_BINARY_DATUM = new PDatum() { @Override @@ -239,7 +240,7 @@ public class SchemaUtil { } /** - * Normalizes the fulltableName . Uses {@linkplain normalizeIdentifier} + * Normalizes the fulltableName . Uses {@linkplain #normalizeIdentifier} * @param fullTableName * @return */ diff --git a/phoenix-core/src/main/protobuf/PTable.proto b/phoenix-core/src/main/protobuf/PTable.proto index 6f6a663a99..4019a63695 100644 --- a/phoenix-core/src/main/protobuf/PTable.proto +++ b/phoenix-core/src/main/protobuf/PTable.proto @@ -30,6 +30,7 @@ enum PTableType { VIEW = 2; INDEX = 3; JOIN = 4; + CDC = 5; } message PColumn { @@ -117,6 +118,7 @@ message PTable { optional bytes externalSchemaId=50; optional PTable transformingNewTable=51; optional bytes streamingTopicName=52; + optional bytes CDCIncludeScopes=55; } message EncodedCQCounter { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java index cfbb4a4572..829cb73ed0 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java @@ -497,7 +497,8 @@ public class QueryParserTest { } } - private CreateCDCStatement parseCreateCDCSimple(String sql, boolean ifNotExists, String tsCol) throws Exception { + private CreateCDCStatement parseCreateCDCSimple(String sql, boolean ifNotExists, String tsCol) + throws Exception { CreateCDCStatement stmt = parseQuery(sql, CreateCDCStatement.class); assertEquals("FOO", stmt.getCdcObjName().getName()); assertEquals("BAR", stmt.getDataTable().getTableName()); @@ -517,17 +518,37 @@ public class QueryParserTest { parseCreateCDCSimple("create cdc foo on bar(ts)", false, "TS"); parseCreateCDCSimple("create cdc foo on s.bar(ts)", false, "TS"); parseCreateCDCSimple("create cdc if not exists foo on bar(ts)", true, "TS"); - stmt = parseCreateCDCSimple("create cdc foo on bar(PHOENIX_ROW_TIMESTAMP())", false, null); - assertEquals("PHOENIX_ROW_TIMESTAMP", stmt.getTimeIdxFunc().getName()); - assertEquals(" PHOENIX_ROW_TIMESTAMP()", stmt.getTimeIdxFunc().toString()); + parseCreateCDCSimple("create cdc foo on bar(t) index_type=g", false, "T"); + parseCreateCDCSimple("create cdc foo on bar(t) index_type=l", false, "T"); + stmt = parseCreateCDCSimple("create cdc foo on bar(TS_FUNC()) TTL=100, INDEX_TYPE=g", + false, null); + assertEquals("TS_FUNC", stmt.getTimeIdxFunc().getName()); + assertEquals(" TS_FUNC()", stmt.getTimeIdxFunc().toString()); + assertEquals(Arrays.asList(new Pair("TTL", 100), new Pair("INDEX_TYPE", "g")), + stmt.getProps().get("")); stmt = parseCreateCDCSimple("create cdc foo on bar(ts) include (pre)", false, "TS"); - assertEquals(new HashSet<>(Arrays.asList(PTable.CDCChangeScope.PRE)), stmt.getIncludeScopes()); - stmt = parseCreateCDCSimple("create cdc foo on bar(ts) include (pre, pre, post)", false, "TS"); - assertEquals(new HashSet<>(Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)), stmt.getIncludeScopes()); - stmt = parseCreateCDCSimple("create cdc if not exists foo on bar(ts) abc=def", true, "TS"); + assertEquals(new HashSet<>(Arrays.asList(PTable.CDCChangeScope.PRE)), + stmt.getIncludeScopes()); + stmt = parseCreateCDCSimple("create cdc foo on bar(ts) include (pre, pre, post)", + false, "TS"); + assertEquals(new HashSet<>(Arrays.asList(PTable.CDCChangeScope.PRE, + PTable.CDCChangeScope.POST)), stmt.getIncludeScopes()); + stmt = parseCreateCDCSimple("create cdc if not exists foo on bar(ts) abc=def", + true, "TS"); assertEquals(Arrays.asList(new Pair("ABC", "def")), stmt.getProps().get("")); - stmt = parseCreateCDCSimple("create cdc if not exists foo on bar(ts) abc=def, prop=val", true, "TS"); - assertEquals(Arrays.asList(new Pair("ABC", "def"), new Pair("PROP", "val")), stmt.getProps().get("")); + stmt = parseCreateCDCSimple("create cdc if not exists foo on bar(ts) abc=def, prop=val", + true, "TS"); + assertEquals(Arrays.asList(new Pair("ABC", "def"), new Pair("PROP", "val")), + stmt.getProps().get("")); + } + + @Test + public void testCreateCDCWithErrors() throws Exception { + parseQueryThatShouldFail("create cdc foo"); + parseQueryThatShouldFail("create cdc foo on bar"); + parseQueryThatShouldFail("create cdc foo on bar(ts integer)"); + parseQueryThatShouldFail("create cdc foo on bar(ts1, ts2)"); + parseQueryThatShouldFail("create cdc foo on bar(ts) include (abc)"); } private void parseInvalidCreateCDC(String sql, int expRrrorCode) throws IOException { @@ -1106,5 +1127,4 @@ public class QueryParserTest { parseQueryThatShouldFail("SELECT b, x from x WHERE x = " + "b'0 10 ' --comment \n /* comment */ '00 000' \n \n ''"); } - } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java new file mode 100644 index 0000000000..3282ba62c9 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java @@ -0,0 +1,39 @@ +package org.apache.phoenix.util; + +import org.apache.phoenix.schema.PTable; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashSet; + +import static org.apache.phoenix.schema.PTable.CDCChangeScope.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class CDCUtilTest { + @Test + public void testScopeSetConstruction() throws Exception { + assertEquals(new HashSet<>(Arrays.asList(PRE)), CDCUtil.makeChangeScopeEnumsFromString( + "PRE")); + assertEquals(new HashSet<>(Arrays.asList(PRE)), + CDCUtil.makeChangeScopeEnumsFromString("PRE,")); + assertEquals(new HashSet<>(Arrays.asList(PRE)), + CDCUtil.makeChangeScopeEnumsFromString("PRE, PRE")); + assertEquals(new HashSet<>(Arrays.asList(PRE)), + CDCUtil.makeChangeScopeEnumsFromString("PRE,DUMMY")); + assertEquals(new HashSet<>(Arrays.asList(CHANGE, PRE, POST, LATEST)), + CDCUtil.makeChangeScopeEnumsFromString("POST,PRE,CHANGE,LATEST")); + } + + @Test + public void testScopeStringConstruction() throws Exception { + assertEquals(null, CDCUtil.makeChangeScopeStringFromEnums(null)); + assertEquals("", CDCUtil.makeChangeScopeStringFromEnums( + new HashSet<PTable.CDCChangeScope>())); + assertEquals("CHANGE,PRE,POST,LATEST", CDCUtil.makeChangeScopeStringFromEnums( + new HashSet<>(Arrays.asList(CHANGE, PRE, POST, LATEST)))); + assertEquals("CHANGE,PRE,POST,LATEST", CDCUtil.makeChangeScopeStringFromEnums( + new HashSet<>(Arrays.asList(PRE, LATEST, POST, CHANGE)))); + } +}