haridsv commented on code in PR #2033: URL: https://github.com/apache/phoenix/pull/2033#discussion_r1871021076
########## phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java: ########## @@ -2039,9 +2041,57 @@ public MutationState createCDC(CreateCDCStatement statement) throws SQLException createTableInternal(tableStatement, null, dataTable, null, null, null, null, null, null, false, null, null, statement.getIncludeScopes(), tableProps, commonFamilyProps); + // for now, only track stream partition metadata for tables, TODO: updatable views + if (PTableType.TABLE.equals(dataTable.getType())) { + updateStreamPartitionMetadata(dataTableFullName); + } return new MutationState(0, 0, connection); } + /** + * Trigger CDC Stream Partition metadata bootstrap for the given table in the background. + * Mark status as ENABLING in SYSTEM.CDC_STREAM_STATUS and add {@link CdcStreamPartitionMetadataTask} + * to SYSTEM.TASK which updates partition metadata based on table regions. + */ + private void updateStreamPartitionMetadata(String tableName) throws SQLException { + long cdcIndexTimestamp = CDCUtil.getCDCCreationTimestamp(connection.getTable(tableName)); + String streamStatusSQL = "UPSERT INTO " + SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)"; + PreparedStatement ps = connection.prepareStatement(streamStatusSQL); + String streamName = String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcIndexTimestamp); + ps.setString(1, tableName); + ps.setString(2, streamName); + ps.setString(3, CDCUtil.CdcStreamStatus.ENABLING.getSerializedValue()); + ps.executeUpdate(); + connection.commit(); Review Comment: We can use try-with-resource block here to ensure PreparedStatement will always be closed and also that it will be released quickly. ########## phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java: ########## @@ -2039,9 +2041,57 @@ public MutationState createCDC(CreateCDCStatement statement) throws SQLException createTableInternal(tableStatement, null, dataTable, null, null, null, null, null, null, false, null, null, statement.getIncludeScopes(), tableProps, commonFamilyProps); + // for now, only track stream partition metadata for tables, TODO: updatable views + if (PTableType.TABLE.equals(dataTable.getType())) { + updateStreamPartitionMetadata(dataTableFullName); + } return new MutationState(0, 0, connection); } + /** + * Trigger CDC Stream Partition metadata bootstrap for the given table in the background. + * Mark status as ENABLING in SYSTEM.CDC_STREAM_STATUS and add {@link CdcStreamPartitionMetadataTask} + * to SYSTEM.TASK which updates partition metadata based on table regions. + */ + private void updateStreamPartitionMetadata(String tableName) throws SQLException { + long cdcIndexTimestamp = CDCUtil.getCDCCreationTimestamp(connection.getTable(tableName)); + String streamStatusSQL = "UPSERT INTO " + SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)"; + PreparedStatement ps = connection.prepareStatement(streamStatusSQL); + String streamName = String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcIndexTimestamp); + ps.setString(1, tableName); + ps.setString(2, streamName); + ps.setString(3, CDCUtil.CdcStreamStatus.ENABLING.getSerializedValue()); + ps.executeUpdate(); + connection.commit(); + + try { + List<Mutation> sysTaskUpsertMutations = Task.getMutationsForAddTask( + new SystemTaskParams.SystemTaskParamsBuilder() + .setConn(connection) + .setTaskType(PTable.TaskType.CDC_STREAM_PARTITION) + .setTableName(tableName) //give full table name + .setSchemaName(streamName) // use schemaName to pass streamName + .build()); + byte[] rowKey = sysTaskUpsertMutations + .get(0).getRow(); + MetaDataProtocol.MetaDataMutationResult metaDataMutationResult = + Task.taskMetaDataCoprocessorExec(connection, rowKey, + new TaskMetaDataServiceCallBack(sysTaskUpsertMutations)); + if (MetaDataProtocol.MutationCode.UNABLE_TO_UPSERT_TASK.equals( + metaDataMutationResult.getMutationCode())) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNABLE_TO_UPSERT_TASK) + .setSchemaName(SYSTEM_SCHEMA_NAME) + .setTableName(SYSTEM_TASK_TABLE).build().buildException(); + } + } catch (IOException ioe) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNABLE_TO_UPSERT_TASK) + .setRootCause(ioe) + .setMessage(ioe.getMessage()) + .setSchemaName(SYSTEM_SCHEMA_NAME) + .setTableName(SYSTEM_TASK_TABLE).build().buildException(); + } Review Comment: Shouldn't we do this before creating the CDC object itself to be technically correct and avoid the window when the CDC is active but the stream is not? Also, how do we plan to synchronize this with the ongoing splits and merges? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org