virajjasani commented on code in PR #2033:
URL: https://github.com/apache/phoenix/pull/2033#discussion_r1868098330


##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java:
##########
@@ -0,0 +1,100 @@
+package org.apache.phoenix.coprocessor.tasks;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.List;
+
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
+import static org.apache.phoenix.util.CDCUtil.CDC_STREAM_NAME_FORMAT;
+
+/**
+ * Task to bootstrap partition metadata when CDC is enabled on a table.
+ * Upserts one row for each region of the table into SYSTEM.CDC_STREAM and 
marks the status as
+ * ENABLED in SYSTEM.CDC_STREAM_STATUS.
+ */
+public class CdcStreamPartitionMetadataTask extends BaseTask  {
+
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(CdcStreamPartitionMetadataTask.class);
+    public static String CDC_STREAM_STATUS_UPSERT_SQL
+            = "UPSERT INTO " + SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, 
?)";
+
+    // parent_partition_id will be null, set partition_end_time to -1
+    private static String CDC_STREAM_PARTITION_UPSERT_SQL
+            = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES 
(?,?,?,null,?,-1,?,?)";

Review Comment:
   nit: public static final



##########
phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java:
##########
@@ -150,4 +152,31 @@ public static boolean isBinaryType(PDataType dataType) {
                 || sqlType == Types.LONGVARBINARY
                 || dataType.getSqlType() == PDataType.VARBINARY_ENCODED_TYPE);
     }
+
+    public enum CdcStreamStatus {
+        ENABLED("ENABLED"),
+        ENABLING("ENABLING"),
+        DISABLED("DISABLED"),
+        DISABLING("DISABLING");
+
+        private final String serializedValue;
+
+        private CdcStreamStatus(String value) {
+            this.serializedValue = value;
+        }
+
+        public String getSerializedValue() {
+            return serializedValue;
+        }
+    }
+
+    public static long getCDCCreationTimestamp(PTable table) {
+        long ts = 0;
+        for (PTable index : table.getIndexes()) {
+            if (CDCUtil.isCDCIndex(index)) {
+                ts = index.getTimeStamp();

Review Comment:
   nit: return value directly from here?



##########
phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java:
##########
@@ -2039,9 +2041,57 @@ public MutationState createCDC(CreateCDCStatement 
statement) throws SQLException
         createTableInternal(tableStatement, null, dataTable, null, null, null, 
null,
                 null, null, false, null,
                 null, statement.getIncludeScopes(), tableProps, 
commonFamilyProps);
+        // for now, only track stream partition metadata for tables, TODO: 
updatable views
+        if (PTableType.TABLE.equals(dataTable.getType())) {
+            updateStreamPartitionMetadata(dataTableFullName);
+        }
         return new MutationState(0, 0, connection);
     }
 
+    /**
+     * Trigger CDC Stream Partition metadata bootstrap for the given table in 
the background.
+     * Mark status as ENABLING in SYSTEM.CDC_STREAM_STATUS and add {@link 
CdcStreamPartitionMetadataTask}
+     * to SYSTEM.TASK which updates partition metadata based on table regions.
+     */
+    private void updateStreamPartitionMetadata(String tableName) throws 
SQLException {
+        long cdcIndexTimestamp = 
CDCUtil.getCDCCreationTimestamp(connection.getTableNoCache(tableName));

Review Comment:
   Can we live with getTable() instead of getTableNoCache()?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to