This is an automated email from the ASF dual-hosted git repository.

palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d06b7a28c PHOENIX-7459 : Bootstrap stream metadata when CDC is 
enabled on a table (#2033)
3d06b7a28c is described below

commit 3d06b7a28c696943d4a4c8e6a691144f1c1cb672
Author: Palash Chauhan <[email protected]>
AuthorDate: Thu Jan 2 19:18:28 2025 -0800

    PHOENIX-7459 : Bootstrap stream metadata when CDC is enabled on a table 
(#2033)
    
    Co-authored-by: Palash Chauhan 
<[email protected]>
---
 .../apache/phoenix/exception/SQLExceptionCode.java |   3 +
 .../org/apache/phoenix/query/QueryConstants.java   |   6 +-
 .../org/apache/phoenix/query/QueryServices.java    |   2 +
 .../apache/phoenix/query/QueryServicesOptions.java |   2 +
 .../org/apache/phoenix/schema/MetaDataClient.java  | 113 +++++++++++--
 .../java/org/apache/phoenix/schema/PTable.java     |   3 +-
 .../main/java/org/apache/phoenix/util/CDCUtil.java |  30 ++++
 .../phoenix/coprocessor/TaskRegionObserver.java    |   1 +
 .../tasks/CdcStreamPartitionMetadataTask.java      | 153 +++++++++++++++++
 .../java/org/apache/phoenix/end2end/CDCBaseIT.java |   4 +
 .../apache/phoenix/end2end/CDCDefinitionIT.java    |  39 +++--
 .../org/apache/phoenix/end2end/CDCStreamIT.java    | 183 +++++++++++++++++++++
 12 files changed, 510 insertions(+), 29 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index d9aa522765..eb52fa0fba 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -383,6 +383,9 @@ public enum SQLExceptionCode {
     SALTING_NOT_ALLOWED_FOR_CDC(10962,"44A44", SALT_BUCKETS +
             " property can not be set for CDC"),
 
+    CDC_ALREADY_ENABLED(10963, "44A45",
+            "CDC on this table is either enabled or is in the process of being 
enabled."),
+
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new 
Factory() {
         @Override
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 5154bf67e5..62142a454c 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -655,11 +655,11 @@ public interface QueryConstants {
             SYSTEM_CDC_STREAM_STATUS_TABLE + "\"(\n" +
             // PK columns
             TABLE_NAME + " VARCHAR NOT NULL," +
-            STREAM_STATUS + " VARCHAR NOT NULL," +
+            STREAM_NAME + " VARCHAR NOT NULL," +
             // Non-PK columns
-            STREAM_NAME + " VARCHAR,\n" +
+            STREAM_STATUS + " VARCHAR,\n" +
             "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" +
-            TABLE_NAME + "," + STREAM_STATUS + "))\n" +
+            TABLE_NAME + "," + STREAM_NAME + "))\n" +
             HConstants.VERSIONS + "=%s,\n" +
             ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" +
             TRANSACTIONAL + "=" + Boolean.FALSE;
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 27e2bfed6c..7035d17bc6 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -533,6 +533,8 @@ public interface QueryServices extends SQLCloseable {
 
     long DEFAULT_PHOENIX_METADATA_CACHE_UPDATE_ROWLOCK_TIMEOUT = 60000;
 
+    String PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT = 
"phoenix.streams.get.table.regions.timeout";
+
     /**
      * Get executor service used for parallel scans
      */
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 73e72c36e4..c4f3812b51 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -448,6 +448,8 @@ public class QueryServicesOptions {
 
     public static final boolean DEFAULT_PHOENIX_GET_METADATA_READ_LOCK_ENABLED 
= true;
 
+    public static final int DEFAULT_PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT 
= 300000; // 5 minutes
+
 
     private final Configuration config;
 
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 129275dec7..bc2cf3a0ef 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -18,10 +18,12 @@
 package org.apache.phoenix.schema;
 
 import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TRANSACTIONAL_TABLE;
+import static 
org.apache.phoenix.exception.SQLExceptionCode.CDC_ALREADY_ENABLED;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.SALTING_NOT_ALLOWED_FOR_CDC;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.TABLE_ALREADY_EXIST;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
@@ -135,6 +137,7 @@ import static org.apache.phoenix.schema.PTableType.TABLE;
 import static org.apache.phoenix.schema.PTableType.VIEW;
 import static org.apache.phoenix.schema.types.PDataType.FALSE_BYTES;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+import static org.apache.phoenix.util.CDCUtil.CDC_STREAM_NAME_FORMAT;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -1961,7 +1964,18 @@ public class MetaDataClient {
         ColumnResolver resolver = 
FromCompiler.getResolver(NamedTableNode.create(statement.getDataTable()), 
connection);
         TableRef tableRef = resolver.getTables().get(0);
         PTable dataTable = tableRef.getTable();
+        String
+                dataTableFullName =
+                
SchemaUtil.getTableName(statement.getDataTable().getSchemaName(),
+                        statement.getDataTable().getTableName());
+        String cdcObjName = statement.getCdcObjName().getName();
 
+        // if CDC was already enabled, throw SQLException with the Stream Name
+        String streamName = getStreamNameIfCDCEnabled(dataTableFullName);
+        if (streamName != null) {
+            throw new 
SQLExceptionInfo.Builder(CDC_ALREADY_ENABLED).setTableName(streamName)
+                    .build().buildException();
+        }
         Map<String, Object> tableProps = Maps.newHashMapWithExpectedSize(
                 statement.getProps().size());
         Map<String, Object> commonFamilyProps = 
Maps.newHashMapWithExpectedSize(
@@ -1971,14 +1985,10 @@ public class MetaDataClient {
         Properties props = connection.getClientInfo();
         props.put(INDEX_CREATE_DEFAULT_STATE, "ACTIVE");
 
-        String
-                dataTableFullName =
-                
SchemaUtil.getTableName(statement.getDataTable().getSchemaName(),
-                        statement.getDataTable().getTableName());
         String
                 createIndexSql =
                 "CREATE UNCOVERED INDEX " + (statement.isIfNotExists() ? "IF 
NOT EXISTS " : "")
-                        + 
CDCUtil.getCDCIndexName(statement.getCdcObjName().getName())
+                        + CDCUtil.getCDCIndexName(cdcObjName)
                         + " ON " + dataTableFullName + " ("
                         + PartitionIdFunction.NAME + "(), " + 
PhoenixRowTimestampFunction.NAME
                         + "()) ASYNC";
@@ -1987,7 +1997,7 @@ public class MetaDataClient {
         indexProps.add("REPLICATION_SCOPE=0");
         if (TableProperty.SALT_BUCKETS.getValue(tableProps) != null) {
             throw new 
SQLExceptionInfo.Builder(SALTING_NOT_ALLOWED_FOR_CDC).setTableName(
-                    
statement.getCdcObjName().getName()).build().buildException();
+                    cdcObjName).build().buildException();
         }
         indexProps.add("SALT_BUCKETS=0");
         Object columnEncodedBytes = 
TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
@@ -2000,13 +2010,11 @@ public class MetaDataClient {
                 pstmt.execute(createIndexSql);
             } catch (SQLException e) {
             if (e.getErrorCode() == TABLE_ALREADY_EXIST.getErrorCode()) {
-                throw new 
SQLExceptionInfo.Builder(TABLE_ALREADY_EXIST).setTableName(
-                        statement.getCdcObjName().getName()).setRootCause(
-                                e).build().buildException();
+                throw new 
SQLExceptionInfo.Builder(TABLE_ALREADY_EXIST).setTableName(cdcObjName)
+                        .setRootCause(e).build().buildException();
             }
             throw e;
         }
-
         List<PColumn> pkColumns = dataTable.getPKColumns();
         List<ColumnDef> columnDefs = new ArrayList<>();
         List<ColumnDefInPkConstraint> pkColumnDefs = new ArrayList<>();
@@ -2032,16 +2040,84 @@ public class MetaDataClient {
             tableProps.put(TableProperty.MULTI_TENANT.getPropertyName(), 
Boolean.TRUE);
         }
         CreateTableStatement tableStatement = FACTORY.createTable(
-                FACTORY.table(dataTable.getSchemaName().getString(), 
statement.getCdcObjName().getName()),
+                FACTORY.table(dataTable.getSchemaName().getString(), 
cdcObjName),
                 null, columnDefs, FACTORY.primaryKey(null, pkColumnDefs),
                 Collections.emptyList(), PTableType.CDC, 
statement.isIfNotExists(), null, null,
                 statement.getBindCount(), null);
         createTableInternal(tableStatement, null, dataTable, null, null, null, 
null,
                 null, null, false, null,
                 null, statement.getIncludeScopes(), tableProps, 
commonFamilyProps);
+        // for now, only track stream partition metadata for tables, TODO: 
updatable views
+        if (PTableType.TABLE == dataTable.getType()) {
+            updateStreamPartitionMetadata(dataTableFullName, cdcObjName);
+        }
         return new MutationState(0, 0, connection);
     }
 
+    /**
+     * Trigger CDC Stream Partition metadata bootstrap for the given table in 
the background.
+     * Mark status as ENABLING in SYSTEM.CDC_STREAM_STATUS and add {@link 
CdcStreamPartitionMetadataTask}
+     * to SYSTEM.TASK which updates partition metadata based on table regions.
+     */
+    private void updateStreamPartitionMetadata(String tableName, String 
cdcObjName) throws SQLException {
+        // create Stream with ENABLING status
+        long cdcIndexTimestamp = 
CDCUtil.getCDCCreationTimestamp(connection.getTable(tableName));
+        String streamStatusSQL = "UPSERT INTO " + 
SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)";
+        String streamName = String.format(CDC_STREAM_NAME_FORMAT, tableName, 
cdcObjName, cdcIndexTimestamp);
+        try (PreparedStatement ps = 
connection.prepareStatement(streamStatusSQL)) {
+            ps.setString(1, tableName);
+            ps.setString(2, streamName);
+            ps.setString(3, 
CDCUtil.CdcStreamStatus.ENABLING.getSerializedValue());
+            ps.executeUpdate();
+            connection.commit();
+            LOGGER.info("Marked stream {} for table {} as ENABLING", 
streamName, tableName);
+        }
+
+        // insert task to update partition metadata for stream
+        try {
+            List<Mutation> sysTaskUpsertMutations = 
Task.getMutationsForAddTask(
+                    new SystemTaskParams.SystemTaskParamsBuilder()
+                    .setConn(connection)
+                    .setTaskType(PTable.TaskType.CDC_STREAM_PARTITION)
+                    .setTableName(tableName) //give full table name
+                    .setSchemaName(streamName) // use schemaName to pass 
streamName
+                    .build());
+            byte[] rowKey = sysTaskUpsertMutations
+                    .get(0).getRow();
+            MetaDataProtocol.MetaDataMutationResult metaDataMutationResult =
+                    Task.taskMetaDataCoprocessorExec(connection, rowKey,
+                            new 
TaskMetaDataServiceCallBack(sysTaskUpsertMutations));
+            if (MetaDataProtocol.MutationCode.UNABLE_TO_UPSERT_TASK.equals(
+                    metaDataMutationResult.getMutationCode())) {
+                throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.UNABLE_TO_UPSERT_TASK)
+                        .setSchemaName(SYSTEM_SCHEMA_NAME)
+                        
.setTableName(SYSTEM_TASK_TABLE).build().buildException();
+            }
+        } catch (IOException ioe) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.UNABLE_TO_UPSERT_TASK)
+                    .setRootCause(ioe)
+                    .setMessage(ioe.getMessage())
+                    .setSchemaName(SYSTEM_SCHEMA_NAME)
+                    .setTableName(SYSTEM_TASK_TABLE).build().buildException();
+        }
+    }
+
+    private String getStreamNameIfCDCEnabled(String tableName) throws 
SQLException {
+        // check if a stream is already enabled for this table
+        String query = "SELECT STREAM_NAME FROM " + 
SYSTEM_CDC_STREAM_STATUS_NAME
+                + " WHERE TABLE_NAME = ? AND STREAM_STATUS IN (?, ?)";
+        try (PreparedStatement ps = connection.prepareStatement(query)) {
+            ps.setString(1, tableName);
+            ps.setString(2, 
CDCUtil.CdcStreamStatus.ENABLING.getSerializedValue());
+            ps.setString(3, 
CDCUtil.CdcStreamStatus.ENABLED.getSerializedValue());
+            ResultSet rs = ps.executeQuery();
+            if (rs.next()) {
+               return rs.getString(1);
+            }
+        }
+        return null;
+    }
+
     /**
      * Go through all the descendent views from the child view hierarchy and 
find if any of the
      * descendent views extends the primary key, throw error.
@@ -3942,11 +4018,22 @@ public class MetaDataClient {
         String schemaName = statement.getTableName().getSchemaName();
         String cdcTableName = statement.getCdcObjName().getName();
         String parentTableName = statement.getTableName().getTableName();
+        String indexName = 
CDCUtil.getCDCIndexName(statement.getCdcObjName().getName());
+        // Mark CDC Stream as Disabled
+        long cdcIndexTimestamp = connection.getTable(indexName).getTimeStamp();
+        String streamStatusSQL = "UPSERT INTO " + 
SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)";
+        String streamName = String.format(CDC_STREAM_NAME_FORMAT, 
parentTableName, cdcTableName, cdcIndexTimestamp);
+        try (PreparedStatement ps = 
connection.prepareStatement(streamStatusSQL)) {
+            ps.setString(1, parentTableName);
+            ps.setString(2, streamName);
+            ps.setString(3, 
CDCUtil.CdcStreamStatus.DISABLED.getSerializedValue());
+            ps.executeUpdate();
+            connection.commit();
+            LOGGER.info("Marked stream {} for table {} as DISABLED", 
streamName, parentTableName);
+        }
         // Dropping the virtual CDC Table
         dropTable(schemaName, cdcTableName, parentTableName, PTableType.CDC, 
statement.ifExists(),
                 false, false);
-
-        String indexName = 
CDCUtil.getCDCIndexName(statement.getCdcObjName().getName());
         // Dropping the uncovered index associated with the CDC Table
         try {
             return dropTable(schemaName, indexName, parentTableName, 
PTableType.INDEX,
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
index 4ace717a06..d112f842e1 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -209,7 +209,8 @@ public interface PTable extends PMetaDataEntity {
     public enum TaskType {
         DROP_CHILD_VIEWS((byte)1),
         INDEX_REBUILD((byte)2),
-        TRANSFORM_MONITOR((byte)3);
+        TRANSFORM_MONITOR((byte)3),
+        CDC_STREAM_PARTITION((byte)4);
 
         private final byte[] byteValue;
         private final byte serializedValue;
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index 74c2263d33..2a89798041 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -35,11 +35,15 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PVarchar;
 import org.bson.RawBsonDocument;
 
 public class CDCUtil {
     public static final String CDC_INDEX_PREFIX = "PHOENIX_CDC_INDEX_";
 
+    // phoenix-cdc-stream-{tableName}-{cdc object name}-{cdc index timestamp}
+    public static String CDC_STREAM_NAME_FORMAT = 
"phoenix-cdc-stream-%s-%s-%d";
+
     /**
      * Make a set of CDC change scope enums from the given string containing 
comma separated scope
      * names.
@@ -150,4 +154,30 @@ public class CDCUtil {
                 || sqlType == Types.LONGVARBINARY
                 || dataType.getSqlType() == PDataType.VARBINARY_ENCODED_TYPE);
     }
+
+    public enum CdcStreamStatus {
+        ENABLED("ENABLED"),
+        ENABLING("ENABLING"),
+        DISABLED("DISABLED"),
+        DISABLING("DISABLING");
+
+        private final String serializedValue;
+
+        private CdcStreamStatus(String value) {
+            this.serializedValue = value;
+        }
+
+        public String getSerializedValue() {
+            return serializedValue;
+        }
+    }
+
+    public static long getCDCCreationTimestamp(PTable table) {
+        for (PTable index : table.getIndexes()) {
+            if (CDCUtil.isCDCIndex(index)) {
+                return index.getTimeStamp();
+            }
+        }
+        return -1;
+    }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
index 207286523f..8a3a8f681b 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
@@ -74,6 +74,7 @@ public class TaskRegionObserver implements RegionObserver, 
RegionCoprocessor {
             .put(TaskType.DROP_CHILD_VIEWS, 
"org.apache.phoenix.coprocessor.tasks.DropChildViewsTask")
             .put(TaskType.INDEX_REBUILD, 
"org.apache.phoenix.coprocessor.tasks.IndexRebuildTask")
             .put(TaskType.TRANSFORM_MONITOR, 
"org.apache.phoenix.coprocessor.tasks.TransformMonitorTask")
+            .put(TaskType.CDC_STREAM_PARTITION, 
"org.apache.phoenix.coprocessor.tasks.CdcStreamPartitionMetadataTask")
             .build();
 
     public enum TaskResultCode {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
new file mode 100644
index 0000000000..c84853481c
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.coprocessor.tasks;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.ServerTask;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.List;
+
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
+import static 
org.apache.phoenix.query.QueryServices.PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT;
+
+/**
+ * Task to bootstrap partition metadata when CDC is enabled on a table.
+ * Upserts one row for each region of the table into SYSTEM.CDC_STREAM and 
marks the status as
+ * ENABLED in SYSTEM.CDC_STREAM_STATUS.
+ */
+public class CdcStreamPartitionMetadataTask extends BaseTask  {
+
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(CdcStreamPartitionMetadataTask.class);
+    private static final String CDC_STREAM_STATUS_UPSERT_SQL
+            = "UPSERT INTO " + SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, 
?)";
+
+    // parent_partition_id will be null, set partition_end_time to -1
+    private static final String CDC_STREAM_PARTITION_UPSERT_SQL
+            = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES 
(?,?,?,null,?,-1,?,?)";
+
+    @Override
+    public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
+        Configuration conf = HBaseConfiguration.create(env.getConfiguration());
+        Configuration configuration = 
HBaseConfiguration.addHbaseResources(conf);
+        int getTableRegionsTimeout = 
configuration.getInt(PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT,
+                DEFAULT_PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT);
+        PhoenixConnection pconn = null;
+        String tableName = taskRecord.getTableName();
+        String streamName = taskRecord.getSchemaName();
+        Timestamp timestamp = taskRecord.getTimeStamp();
+        try {
+            pconn = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
+            List<HRegionLocation> tableRegions = 
pconn.getQueryServices().getAllTableRegions(
+                    tableName.getBytes(), getTableRegionsTimeout);
+            upsertPartitionMetadata(pconn, tableName, streamName, 
tableRegions);
+            updateStreamStatus(pconn, tableName, streamName);
+            return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, "");
+        } catch (SQLException e) {
+            try {
+                // Update task status to RETRY so that it is retried
+                ServerTask.addTask(new 
SystemTaskParams.SystemTaskParamsBuilder()
+                        .setConn(pconn)
+                        .setTaskType(taskRecord.getTaskType())
+                        .setSchemaName(taskRecord.getSchemaName())
+                        .setTableName(taskRecord.getTableName())
+                        .setTaskStatus(PTable.TaskStatus.RETRY.toString())
+                        .setStartTs(taskRecord.getTimeStamp())
+                        .setEndTs(null)
+                        .build());
+                LOGGER.warn("Marking task as RETRY. " +
+                        "SQLException while bootstrapping CDC Stream Partition 
Metadata for "
+                        + taskRecord.getTableName() + " and timestamp " + 
timestamp.toString(), e);
+                return null;
+            } catch (IOException ioe) {
+                LOGGER.error("Unable to mark task as RETRY. " +
+                        "SQLException while bootstrapping CDC Stream Partition 
Metadata for "
+                        + taskRecord.getTableName() + " and timestamp " + 
timestamp.toString(), e);
+                return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, 
e.toString());
+            }
+        } catch (Throwable t) {
+            LOGGER.error("Marking task as FAIL. Exception while bootstrapping 
CDC Stream Partition Metadata for "
+                    + taskRecord.getTableName() + " and timestamp " + 
timestamp.toString(), t);
+            return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, 
t.toString());
+        } finally {
+            if (pconn != null) {
+                try {
+                    pconn.close();
+                } catch (SQLException ignored) {
+                    LOGGER.debug("CdcStreamPartitionMetadataTask can't close 
connection", ignored);
+                }
+            }
+        }
+    }
+
+    @Override
+    public TaskRegionObserver.TaskResult checkCurrentResult(Task.TaskRecord 
taskRecord)
+            throws Exception {
+        return null;
+    }
+
+    private void updateStreamStatus(PhoenixConnection pconn, String tableName, 
String streamName)
+            throws SQLException {
+        try (PreparedStatement ps = 
pconn.prepareStatement(CDC_STREAM_STATUS_UPSERT_SQL)) {
+            ps.setString(1, tableName);
+            ps.setString(2, streamName);
+            ps.setString(3, 
CDCUtil.CdcStreamStatus.ENABLED.getSerializedValue());
+            ps.executeUpdate();
+            pconn.commit();
+            LOGGER.info("Marked stream {} for table {} as ENABLED", 
streamName, tableName);
+        }
+    }
+
+    private void upsertPartitionMetadata(PhoenixConnection pconn, String 
tableName,
+                                         String streamName, 
List<HRegionLocation> tableRegions)
+            throws SQLException {
+        try (PreparedStatement ps = 
pconn.prepareStatement(CDC_STREAM_PARTITION_UPSERT_SQL)) {
+            for (HRegionLocation tableRegion : tableRegions) {
+                RegionInfo ri = tableRegion.getRegionInfo();
+                ps.setString(1, tableName);
+                ps.setString(2, streamName);
+                ps.setString(3, ri.getEncodedName());
+                ps.setLong(4, ri.getRegionId());
+                ps.setBytes(5, ri.getStartKey());
+                ps.setBytes(6, ri.getEndKey());
+                ps.executeUpdate();
+            }
+            pconn.commit();
+            LOGGER.info("Upserted {} partition metadata rows for table : {}, 
stream: {}",
+                    tableRegions.size(), tableName, streamName);
+        }
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
index 08bf35279d..2d9aa6738a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -182,6 +182,10 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
         createTable(conn, cdc_sql, encodingScheme, false, null, false, null);
     }
 
+    protected void dropCDC(Connection conn, String cdcName, String tableName) 
throws SQLException {
+        conn.createStatement().execute("DROP CDC " + cdcName + " ON " + 
tableName);
+    }
+
     protected void assertCDCState(Connection conn, String cdcName, String 
expInclude,
                                   int idxType) throws SQLException {
         try (ResultSet rs = conn.createStatement().executeQuery("SELECT 
cdc_include FROM " +
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
index 274314f106..4a7dfa0e8a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
@@ -23,6 +23,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -34,12 +35,9 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
 
-import static org.apache.phoenix.schema.PTable.CDCChangeScope.POST;
-import static org.apache.phoenix.schema.PTable.CDCChangeScope.PRE;
 import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -95,20 +93,37 @@ public class CDCDefinitionIT extends CDCBaseIT {
             conn.createStatement().execute(cdc_sql);
             fail("Expected to fail due to duplicate index");
         } catch (SQLException e) {
-            assertEquals(SQLExceptionCode.TABLE_ALREADY_EXIST.getErrorCode(), 
e.getErrorCode());
-            assertTrue(e.getMessage().endsWith(cdcName));
+            if (forView) {
+                
assertEquals(SQLExceptionCode.TABLE_ALREADY_EXIST.getErrorCode(), 
e.getErrorCode());
+                assertTrue(e.getMessage().endsWith(cdcName));
+            } else {
+                // we only support Streams for tables as of now
+                
assertEquals(SQLExceptionCode.CDC_ALREADY_ENABLED.getErrorCode(), 
e.getErrorCode());
+            }
         }
 
-        conn.createStatement().execute("CREATE CDC IF NOT EXISTS " + cdcName + 
" ON " + tableName +
-                " INCLUDE (pre, post)");
+        try {
+            conn.createStatement().execute("CREATE CDC IF NOT EXISTS " + 
cdcName + " ON " + tableName +
+                    " INCLUDE (pre, post)");
+        } catch (SQLException e) {
+            // when we replace CREATE CDC with ENABLE CDC, we will not have IF 
NOT EXISTS usage
+            if (!forView) {
+                
assertEquals(SQLExceptionCode.CDC_ALREADY_ENABLED.getErrorCode(), 
e.getErrorCode());
+            }
+        }
 
         cdcName = generateUniqueName();
         cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE 
(pre, post)";
-        createCDC(conn, cdc_sql);
-        assertCDCState(conn, cdcName, PRE+","+POST, 3);
-        assertPTable(cdcName, new HashSet<>(
-                Arrays.asList(PRE, POST)), tableName, datatableName);
-        assertNoResults(conn, cdcName);
+        try {
+            createCDC(conn, cdc_sql);
+        } catch (SQLException e) {
+            if (!forView) {
+                
assertEquals(SQLExceptionCode.CDC_ALREADY_ENABLED.getErrorCode(), 
e.getErrorCode());
+            } else {
+                Assert.fail("Multiple CDCs should be allowed on views.");
+            }
+        }
+
 
         conn.close();
     }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
new file mode 100644
index 0000000000..461ee8a7c0
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
+import static org.apache.phoenix.util.CDCUtil.CDC_STREAM_NAME_FORMAT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(ParallelStatsDisabledTest.class)
+public class CDCStreamIT extends CDCBaseIT {
+    private static RegionCoprocessorEnvironment TaskRegionEnvironment;
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, 
Integer.toString(60*60)); // An hour
+        props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, 
Boolean.toString(false));
+        props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
+                Long.toString(Long.MAX_VALUE));
+        props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
+                Long.toString(Long.MAX_VALUE));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+        TaskRegionEnvironment =
+                getUtility()
+                        .getRSForFirstRegionInTable(
+                                
PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+                        
.getRegions(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+                        .get(0).getCoprocessorHost()
+                        
.findCoprocessorEnvironment(TaskRegionObserver.class.getName());
+    }
+    @Test
+    public void testStreamPartitionMetadataBootstrap() throws Exception {
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        String cdcName = generateUniqueName();
+        String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " 
v1 INTEGER,"
+                        + " v2 DATE)");
+        createCDC(conn, cdc_sql, null);
+        String streamName = getStreamName(conn, tableName, cdcName);
+
+        // stream should be in ENABLING state
+        assertStreamStatus(conn, tableName, streamName, 
CDCUtil.CdcStreamStatus.ENABLING);
+
+        // run task to populate partitions and enable stream
+        TaskRegionObserver.SelfHealingTask task =
+                new TaskRegionObserver.SelfHealingTask(
+                        TaskRegionEnvironment, 
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+        task.run();
+
+        // stream should be in ENABLED state and metadata is populated for 
every table region
+        assertPartitionMetadata(conn, tableName, cdcName);
+        assertStreamStatus(conn, tableName, streamName, 
CDCUtil.CdcStreamStatus.ENABLED);
+    }
+
+    @Test
+    public void testOnlyOneStreamAllowed() throws Exception {
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        String cdcName = generateUniqueName();
+        String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " 
v1 INTEGER,"
+                        + " v2 DATE)");
+        createCDC(conn, cdc_sql, null);
+        String streamName = getStreamName(conn, tableName, cdcName);
+
+        // stream exists in ENABLING status
+        String cdcName2 = generateUniqueName();
+        String cdc_sql2 = "CREATE CDC " + cdcName2 + " ON " + tableName;
+        try {
+            createCDC(conn, cdc_sql2, null);
+            Assert.fail("Only one CDC entity is allowed per table");
+        } catch (SQLException e) {
+            // expected
+            assertEquals(SQLExceptionCode.CDC_ALREADY_ENABLED.getErrorCode(), 
e.getErrorCode());
+            assertTrue(e.getMessage().contains(streamName));
+        }
+
+        // run task to populate partitions and enable stream
+        TaskRegionObserver.SelfHealingTask task =
+                new TaskRegionObserver.SelfHealingTask(
+                        TaskRegionEnvironment, 
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+        task.run();
+
+        // stream exists in ENABLED status
+        try {
+            createCDC(conn, cdc_sql2, null);
+            Assert.fail("Only one CDC entity is allowed per table");
+        } catch (SQLException e) {
+            // expected
+            assertEquals(SQLExceptionCode.CDC_ALREADY_ENABLED.getErrorCode(), 
e.getErrorCode());
+            assertTrue(e.getMessage().contains(streamName));
+        }
+
+        //drop cdc
+        dropCDC(conn, cdcName, tableName);
+        assertStreamStatus(conn, tableName, streamName, 
CDCUtil.CdcStreamStatus.DISABLED);
+
+        //create new CDC
+        String cdcName3 = generateUniqueName();
+        String cdc_sql3 = "CREATE CDC " + cdcName3 + " ON " + tableName;
+        createCDC(conn, cdc_sql3, null);
+        streamName = getStreamName(conn, tableName, cdcName3);
+        assertStreamStatus(conn, tableName, streamName, 
CDCUtil.CdcStreamStatus.ENABLING);
+    }
+
+    private String getStreamName(Connection conn, String tableName, String 
cdcName) throws SQLException {
+        return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName, 
CDCUtil.getCDCCreationTimestamp(
+                        
conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
+    }
+
+    private void assertStreamStatus(Connection conn, String tableName, String 
streamName,
+                                    CDCUtil.CdcStreamStatus status) throws 
SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT 
STREAM_STATUS FROM "
+                + SYSTEM_CDC_STREAM_STATUS_NAME + " WHERE TABLE_NAME='" + 
tableName +
+                "' AND STREAM_NAME='" + streamName + "'");
+        assertTrue(rs.next());
+        Assert.assertEquals(status.getSerializedValue(), rs.getString(1));
+    }
+
+    private void assertPartitionMetadata(Connection conn, String tableName, 
String cdcName)
+            throws SQLException {
+        String streamName = String.format(CDC_STREAM_NAME_FORMAT, tableName, 
cdcName,
+                
CDCUtil.getCDCCreationTimestamp(conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
+        List<HRegionLocation> tableRegions
+                = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAllTableRegions(tableName.getBytes());
+        for (HRegionLocation tableRegion : tableRegions) {
+            RegionInfo ri = tableRegion.getRegionInfo();
+            PreparedStatement ps = conn.prepareStatement("SELECT * FROM " + 
SYSTEM_CDC_STREAM_NAME +
+                    " WHERE TABLE_NAME = ? AND STREAM_NAME = ? AND 
PARTITION_ID= ?");
+            ps.setString(1, tableName);
+            ps.setString(2, streamName);
+            ps.setString(3, ri.getEncodedName());
+            ResultSet rs = ps.executeQuery();
+            assertTrue(rs.next());
+        }
+    }
+}


Reply via email to