This is an automated email from the ASF dual-hosted git repository.
palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 3d06b7a28c PHOENIX-7459 : Bootstrap stream metadata when CDC is
enabled on a table (#2033)
3d06b7a28c is described below
commit 3d06b7a28c696943d4a4c8e6a691144f1c1cb672
Author: Palash Chauhan <[email protected]>
AuthorDate: Thu Jan 2 19:18:28 2025 -0800
PHOENIX-7459 : Bootstrap stream metadata when CDC is enabled on a table
(#2033)
Co-authored-by: Palash Chauhan
<[email protected]>
---
.../apache/phoenix/exception/SQLExceptionCode.java | 3 +
.../org/apache/phoenix/query/QueryConstants.java | 6 +-
.../org/apache/phoenix/query/QueryServices.java | 2 +
.../apache/phoenix/query/QueryServicesOptions.java | 2 +
.../org/apache/phoenix/schema/MetaDataClient.java | 113 +++++++++++--
.../java/org/apache/phoenix/schema/PTable.java | 3 +-
.../main/java/org/apache/phoenix/util/CDCUtil.java | 30 ++++
.../phoenix/coprocessor/TaskRegionObserver.java | 1 +
.../tasks/CdcStreamPartitionMetadataTask.java | 153 +++++++++++++++++
.../java/org/apache/phoenix/end2end/CDCBaseIT.java | 4 +
.../apache/phoenix/end2end/CDCDefinitionIT.java | 39 +++--
.../org/apache/phoenix/end2end/CDCStreamIT.java | 183 +++++++++++++++++++++
12 files changed, 510 insertions(+), 29 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index d9aa522765..eb52fa0fba 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -383,6 +383,9 @@ public enum SQLExceptionCode {
SALTING_NOT_ALLOWED_FOR_CDC(10962,"44A44", SALT_BUCKETS +
" property can not be set for CDC"),
+ CDC_ALREADY_ENABLED(10963, "44A45",
+ "CDC on this table is either enabled or is in the process of being
enabled."),
+
/** Sequence related */
SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new
Factory() {
@Override
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 5154bf67e5..62142a454c 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -655,11 +655,11 @@ public interface QueryConstants {
SYSTEM_CDC_STREAM_STATUS_TABLE + "\"(\n" +
// PK columns
TABLE_NAME + " VARCHAR NOT NULL," +
- STREAM_STATUS + " VARCHAR NOT NULL," +
+ STREAM_NAME + " VARCHAR NOT NULL," +
// Non-PK columns
- STREAM_NAME + " VARCHAR,\n" +
+ STREAM_STATUS + " VARCHAR,\n" +
"CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" +
- TABLE_NAME + "," + STREAM_STATUS + "))\n" +
+ TABLE_NAME + "," + STREAM_NAME + "))\n" +
HConstants.VERSIONS + "=%s,\n" +
ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" +
TRANSACTIONAL + "=" + Boolean.FALSE;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 27e2bfed6c..7035d17bc6 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -533,6 +533,8 @@ public interface QueryServices extends SQLCloseable {
long DEFAULT_PHOENIX_METADATA_CACHE_UPDATE_ROWLOCK_TIMEOUT = 60000;
+ String PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT =
"phoenix.streams.get.table.regions.timeout";
+
/**
* Get executor service used for parallel scans
*/
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 73e72c36e4..c4f3812b51 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -448,6 +448,8 @@ public class QueryServicesOptions {
public static final boolean DEFAULT_PHOENIX_GET_METADATA_READ_LOCK_ENABLED
= true;
+ public static final int DEFAULT_PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT
= 300000; // 5 minutes
+
private final Configuration config;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 129275dec7..bc2cf3a0ef 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -18,10 +18,12 @@
package org.apache.phoenix.schema;
import static
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TRANSACTIONAL_TABLE;
+import static
org.apache.phoenix.exception.SQLExceptionCode.CDC_ALREADY_ENABLED;
import static
org.apache.phoenix.exception.SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY;
import static
org.apache.phoenix.exception.SQLExceptionCode.SALTING_NOT_ALLOWED_FOR_CDC;
import static
org.apache.phoenix.exception.SQLExceptionCode.TABLE_ALREADY_EXIST;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
@@ -135,6 +137,7 @@ import static org.apache.phoenix.schema.PTableType.TABLE;
import static org.apache.phoenix.schema.PTableType.VIEW;
import static org.apache.phoenix.schema.types.PDataType.FALSE_BYTES;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+import static org.apache.phoenix.util.CDCUtil.CDC_STREAM_NAME_FORMAT;
import java.io.BufferedReader;
import java.io.File;
@@ -1961,7 +1964,18 @@ public class MetaDataClient {
ColumnResolver resolver =
FromCompiler.getResolver(NamedTableNode.create(statement.getDataTable()),
connection);
TableRef tableRef = resolver.getTables().get(0);
PTable dataTable = tableRef.getTable();
+ String
+ dataTableFullName =
+
SchemaUtil.getTableName(statement.getDataTable().getSchemaName(),
+ statement.getDataTable().getTableName());
+ String cdcObjName = statement.getCdcObjName().getName();
+ // if CDC was already enabled, throw SQLException with the Stream Name
+ String streamName = getStreamNameIfCDCEnabled(dataTableFullName);
+ if (streamName != null) {
+ throw new
SQLExceptionInfo.Builder(CDC_ALREADY_ENABLED).setTableName(streamName)
+ .build().buildException();
+ }
Map<String, Object> tableProps = Maps.newHashMapWithExpectedSize(
statement.getProps().size());
Map<String, Object> commonFamilyProps =
Maps.newHashMapWithExpectedSize(
@@ -1971,14 +1985,10 @@ public class MetaDataClient {
Properties props = connection.getClientInfo();
props.put(INDEX_CREATE_DEFAULT_STATE, "ACTIVE");
- String
- dataTableFullName =
-
SchemaUtil.getTableName(statement.getDataTable().getSchemaName(),
- statement.getDataTable().getTableName());
String
createIndexSql =
"CREATE UNCOVERED INDEX " + (statement.isIfNotExists() ? "IF
NOT EXISTS " : "")
- +
CDCUtil.getCDCIndexName(statement.getCdcObjName().getName())
+ + CDCUtil.getCDCIndexName(cdcObjName)
+ " ON " + dataTableFullName + " ("
+ PartitionIdFunction.NAME + "(), " +
PhoenixRowTimestampFunction.NAME
+ "()) ASYNC";
@@ -1987,7 +1997,7 @@ public class MetaDataClient {
indexProps.add("REPLICATION_SCOPE=0");
if (TableProperty.SALT_BUCKETS.getValue(tableProps) != null) {
throw new
SQLExceptionInfo.Builder(SALTING_NOT_ALLOWED_FOR_CDC).setTableName(
-
statement.getCdcObjName().getName()).build().buildException();
+ cdcObjName).build().buildException();
}
indexProps.add("SALT_BUCKETS=0");
Object columnEncodedBytes =
TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
@@ -2000,13 +2010,11 @@ public class MetaDataClient {
pstmt.execute(createIndexSql);
} catch (SQLException e) {
if (e.getErrorCode() == TABLE_ALREADY_EXIST.getErrorCode()) {
- throw new
SQLExceptionInfo.Builder(TABLE_ALREADY_EXIST).setTableName(
- statement.getCdcObjName().getName()).setRootCause(
- e).build().buildException();
+ throw new
SQLExceptionInfo.Builder(TABLE_ALREADY_EXIST).setTableName(cdcObjName)
+ .setRootCause(e).build().buildException();
}
throw e;
}
-
List<PColumn> pkColumns = dataTable.getPKColumns();
List<ColumnDef> columnDefs = new ArrayList<>();
List<ColumnDefInPkConstraint> pkColumnDefs = new ArrayList<>();
@@ -2032,16 +2040,84 @@ public class MetaDataClient {
tableProps.put(TableProperty.MULTI_TENANT.getPropertyName(),
Boolean.TRUE);
}
CreateTableStatement tableStatement = FACTORY.createTable(
- FACTORY.table(dataTable.getSchemaName().getString(),
statement.getCdcObjName().getName()),
+ FACTORY.table(dataTable.getSchemaName().getString(),
cdcObjName),
null, columnDefs, FACTORY.primaryKey(null, pkColumnDefs),
Collections.emptyList(), PTableType.CDC,
statement.isIfNotExists(), null, null,
statement.getBindCount(), null);
createTableInternal(tableStatement, null, dataTable, null, null, null,
null,
null, null, false, null,
null, statement.getIncludeScopes(), tableProps,
commonFamilyProps);
+ // for now, only track stream partition metadata for tables, TODO:
updatable views
+ if (PTableType.TABLE == dataTable.getType()) {
+ updateStreamPartitionMetadata(dataTableFullName, cdcObjName);
+ }
return new MutationState(0, 0, connection);
}
+ /**
+ * Trigger CDC Stream Partition metadata bootstrap for the given table in
the background.
+ * Mark status as ENABLING in SYSTEM.CDC_STREAM_STATUS and add {@link
CdcStreamPartitionMetadataTask}
+ * to SYSTEM.TASK which updates partition metadata based on table regions.
+ */
+ private void updateStreamPartitionMetadata(String tableName, String
cdcObjName) throws SQLException {
+ // create Stream with ENABLING status
+ long cdcIndexTimestamp =
CDCUtil.getCDCCreationTimestamp(connection.getTable(tableName));
+ String streamStatusSQL = "UPSERT INTO " +
SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)";
+ String streamName = String.format(CDC_STREAM_NAME_FORMAT, tableName,
cdcObjName, cdcIndexTimestamp);
+ try (PreparedStatement ps =
connection.prepareStatement(streamStatusSQL)) {
+ ps.setString(1, tableName);
+ ps.setString(2, streamName);
+ ps.setString(3,
CDCUtil.CdcStreamStatus.ENABLING.getSerializedValue());
+ ps.executeUpdate();
+ connection.commit();
+ LOGGER.info("Marked stream {} for table {} as ENABLING",
streamName, tableName);
+ }
+
+ // insert task to update partition metadata for stream
+ try {
+ List<Mutation> sysTaskUpsertMutations =
Task.getMutationsForAddTask(
+ new SystemTaskParams.SystemTaskParamsBuilder()
+ .setConn(connection)
+ .setTaskType(PTable.TaskType.CDC_STREAM_PARTITION)
+ .setTableName(tableName) //give full table name
+ .setSchemaName(streamName) // use schemaName to pass
streamName
+ .build());
+ byte[] rowKey = sysTaskUpsertMutations
+ .get(0).getRow();
+ MetaDataProtocol.MetaDataMutationResult metaDataMutationResult =
+ Task.taskMetaDataCoprocessorExec(connection, rowKey,
+ new
TaskMetaDataServiceCallBack(sysTaskUpsertMutations));
+ if (MetaDataProtocol.MutationCode.UNABLE_TO_UPSERT_TASK.equals(
+ metaDataMutationResult.getMutationCode())) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.UNABLE_TO_UPSERT_TASK)
+ .setSchemaName(SYSTEM_SCHEMA_NAME)
+
.setTableName(SYSTEM_TASK_TABLE).build().buildException();
+ }
+ } catch (IOException ioe) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.UNABLE_TO_UPSERT_TASK)
+ .setRootCause(ioe)
+ .setMessage(ioe.getMessage())
+ .setSchemaName(SYSTEM_SCHEMA_NAME)
+ .setTableName(SYSTEM_TASK_TABLE).build().buildException();
+ }
+ }
+
+ private String getStreamNameIfCDCEnabled(String tableName) throws
SQLException {
+ // check if a stream is already enabled for this table
+ String query = "SELECT STREAM_NAME FROM " +
SYSTEM_CDC_STREAM_STATUS_NAME
+ + " WHERE TABLE_NAME = ? AND STREAM_STATUS IN (?, ?)";
+ try (PreparedStatement ps = connection.prepareStatement(query)) {
+ ps.setString(1, tableName);
+ ps.setString(2,
CDCUtil.CdcStreamStatus.ENABLING.getSerializedValue());
+ ps.setString(3,
CDCUtil.CdcStreamStatus.ENABLED.getSerializedValue());
+ ResultSet rs = ps.executeQuery();
+ if (rs.next()) {
+ return rs.getString(1);
+ }
+ }
+ return null;
+ }
+
/**
* Go through all the descendent views from the child view hierarchy and
find if any of the
* descendent views extends the primary key, throw error.
@@ -3942,11 +4018,22 @@ public class MetaDataClient {
String schemaName = statement.getTableName().getSchemaName();
String cdcTableName = statement.getCdcObjName().getName();
String parentTableName = statement.getTableName().getTableName();
+ String indexName =
CDCUtil.getCDCIndexName(statement.getCdcObjName().getName());
+ // Mark CDC Stream as Disabled
+ long cdcIndexTimestamp = connection.getTable(indexName).getTimeStamp();
+ String streamStatusSQL = "UPSERT INTO " +
SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)";
+ String streamName = String.format(CDC_STREAM_NAME_FORMAT,
parentTableName, cdcTableName, cdcIndexTimestamp);
+ try (PreparedStatement ps =
connection.prepareStatement(streamStatusSQL)) {
+ ps.setString(1, parentTableName);
+ ps.setString(2, streamName);
+ ps.setString(3,
CDCUtil.CdcStreamStatus.DISABLED.getSerializedValue());
+ ps.executeUpdate();
+ connection.commit();
+ LOGGER.info("Marked stream {} for table {} as DISABLED",
streamName, parentTableName);
+ }
// Dropping the virtual CDC Table
dropTable(schemaName, cdcTableName, parentTableName, PTableType.CDC,
statement.ifExists(),
false, false);
-
- String indexName =
CDCUtil.getCDCIndexName(statement.getCdcObjName().getName());
// Dropping the uncovered index associated with the CDC Table
try {
return dropTable(schemaName, indexName, parentTableName,
PTableType.INDEX,
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
index 4ace717a06..d112f842e1 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -209,7 +209,8 @@ public interface PTable extends PMetaDataEntity {
public enum TaskType {
DROP_CHILD_VIEWS((byte)1),
INDEX_REBUILD((byte)2),
- TRANSFORM_MONITOR((byte)3);
+ TRANSFORM_MONITOR((byte)3),
+ CDC_STREAM_PARTITION((byte)4);
private final byte[] byteValue;
private final byte serializedValue;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index 74c2263d33..2a89798041 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -35,11 +35,15 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PVarchar;
import org.bson.RawBsonDocument;
public class CDCUtil {
public static final String CDC_INDEX_PREFIX = "PHOENIX_CDC_INDEX_";
+ // phoenix-cdc-stream-{tableName}-{cdc object name}-{cdc index timestamp}
+ public static String CDC_STREAM_NAME_FORMAT =
"phoenix-cdc-stream-%s-%s-%d";
+
/**
* Make a set of CDC change scope enums from the given string containing
comma separated scope
* names.
@@ -150,4 +154,30 @@ public class CDCUtil {
|| sqlType == Types.LONGVARBINARY
|| dataType.getSqlType() == PDataType.VARBINARY_ENCODED_TYPE);
}
+
+ public enum CdcStreamStatus {
+ ENABLED("ENABLED"),
+ ENABLING("ENABLING"),
+ DISABLED("DISABLED"),
+ DISABLING("DISABLING");
+
+ private final String serializedValue;
+
+ private CdcStreamStatus(String value) {
+ this.serializedValue = value;
+ }
+
+ public String getSerializedValue() {
+ return serializedValue;
+ }
+ }
+
+ public static long getCDCCreationTimestamp(PTable table) {
+ for (PTable index : table.getIndexes()) {
+ if (CDCUtil.isCDCIndex(index)) {
+ return index.getTimeStamp();
+ }
+ }
+ return -1;
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
index 207286523f..8a3a8f681b 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
@@ -74,6 +74,7 @@ public class TaskRegionObserver implements RegionObserver,
RegionCoprocessor {
.put(TaskType.DROP_CHILD_VIEWS,
"org.apache.phoenix.coprocessor.tasks.DropChildViewsTask")
.put(TaskType.INDEX_REBUILD,
"org.apache.phoenix.coprocessor.tasks.IndexRebuildTask")
.put(TaskType.TRANSFORM_MONITOR,
"org.apache.phoenix.coprocessor.tasks.TransformMonitorTask")
+ .put(TaskType.CDC_STREAM_PARTITION,
"org.apache.phoenix.coprocessor.tasks.CdcStreamPartitionMetadataTask")
.build();
public enum TaskResultCode {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
new file mode 100644
index 0000000000..c84853481c
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.coprocessor.tasks;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.ServerTask;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.List;
+
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
+import static
org.apache.phoenix.query.QueryServices.PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT;
+
+/**
+ * Task to bootstrap partition metadata when CDC is enabled on a table.
+ * Upserts one row for each region of the table into SYSTEM.CDC_STREAM and
marks the status as
+ * ENABLED in SYSTEM.CDC_STREAM_STATUS.
+ */
+public class CdcStreamPartitionMetadataTask extends BaseTask {
+
+ public static final Logger LOGGER =
LoggerFactory.getLogger(CdcStreamPartitionMetadataTask.class);
+ private static final String CDC_STREAM_STATUS_UPSERT_SQL
+ = "UPSERT INTO " + SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?,
?)";
+
+ // parent_partition_id will be null, set partition_end_time to -1
+ private static final String CDC_STREAM_PARTITION_UPSERT_SQL
+ = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES
(?,?,?,null,?,-1,?,?)";
+
+ @Override
+ public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
+ Configuration conf = HBaseConfiguration.create(env.getConfiguration());
+ Configuration configuration =
HBaseConfiguration.addHbaseResources(conf);
+ int getTableRegionsTimeout =
configuration.getInt(PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT,
+ DEFAULT_PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT);
+ PhoenixConnection pconn = null;
+ String tableName = taskRecord.getTableName();
+ String streamName = taskRecord.getSchemaName();
+ Timestamp timestamp = taskRecord.getTimeStamp();
+ try {
+ pconn =
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
+ List<HRegionLocation> tableRegions =
pconn.getQueryServices().getAllTableRegions(
+ tableName.getBytes(), getTableRegionsTimeout);
+ upsertPartitionMetadata(pconn, tableName, streamName,
tableRegions);
+ updateStreamStatus(pconn, tableName, streamName);
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, "");
+ } catch (SQLException e) {
+ try {
+ // Update task status to RETRY so that it is retried
+ ServerTask.addTask(new
SystemTaskParams.SystemTaskParamsBuilder()
+ .setConn(pconn)
+ .setTaskType(taskRecord.getTaskType())
+ .setSchemaName(taskRecord.getSchemaName())
+ .setTableName(taskRecord.getTableName())
+ .setTaskStatus(PTable.TaskStatus.RETRY.toString())
+ .setStartTs(taskRecord.getTimeStamp())
+ .setEndTs(null)
+ .build());
+ LOGGER.warn("Marking task as RETRY. " +
+ "SQLException while bootstrapping CDC Stream Partition
Metadata for "
+ + taskRecord.getTableName() + " and timestamp " +
timestamp.toString(), e);
+ return null;
+ } catch (IOException ioe) {
+ LOGGER.error("Unable to mark task as RETRY. " +
+ "SQLException while bootstrapping CDC Stream Partition
Metadata for "
+ + taskRecord.getTableName() + " and timestamp " +
timestamp.toString(), e);
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
e.toString());
+ }
+ } catch (Throwable t) {
+ LOGGER.error("Marking task as FAIL. Exception while bootstrapping
CDC Stream Partition Metadata for "
+ + taskRecord.getTableName() + " and timestamp " +
timestamp.toString(), t);
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
t.toString());
+ } finally {
+ if (pconn != null) {
+ try {
+ pconn.close();
+ } catch (SQLException ignored) {
+ LOGGER.debug("CdcStreamPartitionMetadataTask can't close
connection", ignored);
+ }
+ }
+ }
+ }
+
+ @Override
+ public TaskRegionObserver.TaskResult checkCurrentResult(Task.TaskRecord
taskRecord)
+ throws Exception {
+ return null;
+ }
+
+ private void updateStreamStatus(PhoenixConnection pconn, String tableName,
String streamName)
+ throws SQLException {
+ try (PreparedStatement ps =
pconn.prepareStatement(CDC_STREAM_STATUS_UPSERT_SQL)) {
+ ps.setString(1, tableName);
+ ps.setString(2, streamName);
+ ps.setString(3,
CDCUtil.CdcStreamStatus.ENABLED.getSerializedValue());
+ ps.executeUpdate();
+ pconn.commit();
+ LOGGER.info("Marked stream {} for table {} as ENABLED",
streamName, tableName);
+ }
+ }
+
+ private void upsertPartitionMetadata(PhoenixConnection pconn, String
tableName,
+ String streamName,
List<HRegionLocation> tableRegions)
+ throws SQLException {
+ try (PreparedStatement ps =
pconn.prepareStatement(CDC_STREAM_PARTITION_UPSERT_SQL)) {
+ for (HRegionLocation tableRegion : tableRegions) {
+ RegionInfo ri = tableRegion.getRegionInfo();
+ ps.setString(1, tableName);
+ ps.setString(2, streamName);
+ ps.setString(3, ri.getEncodedName());
+ ps.setLong(4, ri.getRegionId());
+ ps.setBytes(5, ri.getStartKey());
+ ps.setBytes(6, ri.getEndKey());
+ ps.executeUpdate();
+ }
+ pconn.commit();
+ LOGGER.info("Upserted {} partition metadata rows for table : {},
stream: {}",
+ tableRegions.size(), tableName, streamName);
+ }
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
index 08bf35279d..2d9aa6738a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -182,6 +182,10 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
createTable(conn, cdc_sql, encodingScheme, false, null, false, null);
}
+ protected void dropCDC(Connection conn, String cdcName, String tableName)
throws SQLException {
+ conn.createStatement().execute("DROP CDC " + cdcName + " ON " +
tableName);
+ }
+
protected void assertCDCState(Connection conn, String cdcName, String
expInclude,
int idxType) throws SQLException {
try (ResultSet rs = conn.createStatement().executeQuery("SELECT
cdc_include FROM " +
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
index 274314f106..4a7dfa0e8a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
@@ -23,6 +23,7 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -34,12 +35,9 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
import java.util.Properties;
-import static org.apache.phoenix.schema.PTable.CDCChangeScope.POST;
-import static org.apache.phoenix.schema.PTable.CDCChangeScope.PRE;
import static
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -95,20 +93,37 @@ public class CDCDefinitionIT extends CDCBaseIT {
conn.createStatement().execute(cdc_sql);
fail("Expected to fail due to duplicate index");
} catch (SQLException e) {
- assertEquals(SQLExceptionCode.TABLE_ALREADY_EXIST.getErrorCode(),
e.getErrorCode());
- assertTrue(e.getMessage().endsWith(cdcName));
+ if (forView) {
+
assertEquals(SQLExceptionCode.TABLE_ALREADY_EXIST.getErrorCode(),
e.getErrorCode());
+ assertTrue(e.getMessage().endsWith(cdcName));
+ } else {
+ // we only support Streams for tables as of now
+
assertEquals(SQLExceptionCode.CDC_ALREADY_ENABLED.getErrorCode(),
e.getErrorCode());
+ }
}
- conn.createStatement().execute("CREATE CDC IF NOT EXISTS " + cdcName +
" ON " + tableName +
- " INCLUDE (pre, post)");
+ try {
+ conn.createStatement().execute("CREATE CDC IF NOT EXISTS " +
cdcName + " ON " + tableName +
+ " INCLUDE (pre, post)");
+ } catch (SQLException e) {
+ // when we replace CREATE CDC with ENABLE CDC, we will not have IF
NOT EXISTS usage
+ if (!forView) {
+
assertEquals(SQLExceptionCode.CDC_ALREADY_ENABLED.getErrorCode(),
e.getErrorCode());
+ }
+ }
cdcName = generateUniqueName();
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE
(pre, post)";
- createCDC(conn, cdc_sql);
- assertCDCState(conn, cdcName, PRE+","+POST, 3);
- assertPTable(cdcName, new HashSet<>(
- Arrays.asList(PRE, POST)), tableName, datatableName);
- assertNoResults(conn, cdcName);
+ try {
+ createCDC(conn, cdc_sql);
+ } catch (SQLException e) {
+ if (!forView) {
+
assertEquals(SQLExceptionCode.CDC_ALREADY_ENABLED.getErrorCode(),
e.getErrorCode());
+ } else {
+ Assert.fail("Multiple CDCs should be allowed on views.");
+ }
+ }
+
conn.close();
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
new file mode 100644
index 0000000000..461ee8a7c0
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
+import static org.apache.phoenix.util.CDCUtil.CDC_STREAM_NAME_FORMAT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(ParallelStatsDisabledTest.class)
+public class CDCStreamIT extends CDCBaseIT {
+ private static RegionCoprocessorEnvironment TaskRegionEnvironment;
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
Integer.toString(60*60)); // An hour
+ props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION,
Boolean.toString(false));
+ props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
+ Long.toString(Long.MAX_VALUE));
+ props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
+ Long.toString(Long.MAX_VALUE));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ TaskRegionEnvironment =
+ getUtility()
+ .getRSForFirstRegionInTable(
+
PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+
.getRegions(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+ .get(0).getCoprocessorHost()
+
.findCoprocessorEnvironment(TaskRegionObserver.class.getName());
+ }
+ @Test
+ public void testStreamPartitionMetadataBootstrap() throws Exception {
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ String cdcName = generateUniqueName();
+ String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + "
v1 INTEGER,"
+ + " v2 DATE)");
+ createCDC(conn, cdc_sql, null);
+ String streamName = getStreamName(conn, tableName, cdcName);
+
+ // stream should be in ENABLING state
+ assertStreamStatus(conn, tableName, streamName,
CDCUtil.CdcStreamStatus.ENABLING);
+
+ // run task to populate partitions and enable stream
+ TaskRegionObserver.SelfHealingTask task =
+ new TaskRegionObserver.SelfHealingTask(
+ TaskRegionEnvironment,
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+ task.run();
+
+ // stream should be in ENABLED state and metadata is populated for
every table region
+ assertPartitionMetadata(conn, tableName, cdcName);
+ assertStreamStatus(conn, tableName, streamName,
CDCUtil.CdcStreamStatus.ENABLED);
+ }
+
+ @Test
+ public void testOnlyOneStreamAllowed() throws Exception {
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ String cdcName = generateUniqueName();
+ String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + "
v1 INTEGER,"
+ + " v2 DATE)");
+ createCDC(conn, cdc_sql, null);
+ String streamName = getStreamName(conn, tableName, cdcName);
+
+ // stream exists in ENABLING status
+ String cdcName2 = generateUniqueName();
+ String cdc_sql2 = "CREATE CDC " + cdcName2 + " ON " + tableName;
+ try {
+ createCDC(conn, cdc_sql2, null);
+ Assert.fail("Only one CDC entity is allowed per table");
+ } catch (SQLException e) {
+ // expected
+ assertEquals(SQLExceptionCode.CDC_ALREADY_ENABLED.getErrorCode(),
e.getErrorCode());
+ assertTrue(e.getMessage().contains(streamName));
+ }
+
+ // run task to populate partitions and enable stream
+ TaskRegionObserver.SelfHealingTask task =
+ new TaskRegionObserver.SelfHealingTask(
+ TaskRegionEnvironment,
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+ task.run();
+
+ // stream exists in ENABLED status
+ try {
+ createCDC(conn, cdc_sql2, null);
+ Assert.fail("Only one CDC entity is allowed per table");
+ } catch (SQLException e) {
+ // expected
+ assertEquals(SQLExceptionCode.CDC_ALREADY_ENABLED.getErrorCode(),
e.getErrorCode());
+ assertTrue(e.getMessage().contains(streamName));
+ }
+
+ //drop cdc
+ dropCDC(conn, cdcName, tableName);
+ assertStreamStatus(conn, tableName, streamName,
CDCUtil.CdcStreamStatus.DISABLED);
+
+ //create new CDC
+ String cdcName3 = generateUniqueName();
+ String cdc_sql3 = "CREATE CDC " + cdcName3 + " ON " + tableName;
+ createCDC(conn, cdc_sql3, null);
+ streamName = getStreamName(conn, tableName, cdcName3);
+ assertStreamStatus(conn, tableName, streamName,
CDCUtil.CdcStreamStatus.ENABLING);
+ }
+
+ private String getStreamName(Connection conn, String tableName, String
cdcName) throws SQLException {
+ return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName,
CDCUtil.getCDCCreationTimestamp(
+
conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
+ }
+
+ private void assertStreamStatus(Connection conn, String tableName, String
streamName,
+ CDCUtil.CdcStreamStatus status) throws
SQLException {
+ ResultSet rs = conn.createStatement().executeQuery("SELECT
STREAM_STATUS FROM "
+ + SYSTEM_CDC_STREAM_STATUS_NAME + " WHERE TABLE_NAME='" +
tableName +
+ "' AND STREAM_NAME='" + streamName + "'");
+ assertTrue(rs.next());
+ Assert.assertEquals(status.getSerializedValue(), rs.getString(1));
+ }
+
+ private void assertPartitionMetadata(Connection conn, String tableName,
String cdcName)
+ throws SQLException {
+ String streamName = String.format(CDC_STREAM_NAME_FORMAT, tableName,
cdcName,
+
CDCUtil.getCDCCreationTimestamp(conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
+ List<HRegionLocation> tableRegions
+ =
conn.unwrap(PhoenixConnection.class).getQueryServices().getAllTableRegions(tableName.getBytes());
+ for (HRegionLocation tableRegion : tableRegions) {
+ RegionInfo ri = tableRegion.getRegionInfo();
+ PreparedStatement ps = conn.prepareStatement("SELECT * FROM " +
SYSTEM_CDC_STREAM_NAME +
+ " WHERE TABLE_NAME = ? AND STREAM_NAME = ? AND
PARTITION_ID= ?");
+ ps.setString(1, tableName);
+ ps.setString(2, streamName);
+ ps.setString(3, ri.getEncodedName());
+ ResultSet rs = ps.executeQuery();
+ assertTrue(rs.next());
+ }
+ }
+}