cshuo commented on code in PR #4164: URL: https://github.com/apache/flink-cdc/pull/4164#discussion_r2501924107
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/coordinator/MultiTableStreamWriteOperatorCoordinator.java: ########## @@ -0,0 +1,973 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.hudi.sink.coordinator; + +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.connectors.hudi.sink.event.CreateTableOperatorEvent; +import org.apache.flink.cdc.connectors.hudi.sink.event.EnhancedWriteMetadataEvent; +import org.apache.flink.cdc.connectors.hudi.sink.event.SchemaChangeOperatorEvent; +import org.apache.flink.cdc.connectors.hudi.sink.util.RowDataUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.SerializationUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; +import org.apache.hudi.sink.event.Correspondent; +import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.sink.utils.CoordinationResponseSerDe; +import org.apache.hudi.sink.utils.EventBuffers; +import org.apache.hudi.sink.utils.ExplicitClassloaderThreadFactory; +import org.apache.hudi.sink.utils.NonThrownExecutor; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.ClusteringUtil; +import org.apache.hudi.util.CompactionUtil; +import org.apache.hudi.util.FlinkWriteClients; +import org.apache.hudi.util.StreamerUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.hudi.configuration.FlinkOptions.COMPACTION_DELTA_COMMITS; + +/** + * A custom OperatorCoordinator that manages Hudi writes for multiple tables. + * + * <p>This coordinator extends the default {@link StreamWriteOperatorCoordinator}. The parent class + * is designed for a single destination table, so its core logic (e.g., for commits and + * checkpointing) cannot be reused directly for a multi-table sink. + * + * <p>Therefore, this implementation overrides the essential lifecycle methods to manage a + * collection of per-table resources. It dynamically creates and manages a dedicated {@link + * HoodieFlinkWriteClient}, {@link EventBuffers}, and timeline for each table that appears in the + * upstream CDC data. + */ +public class MultiTableStreamWriteOperatorCoordinator extends StreamWriteOperatorCoordinator { + + private static final Logger LOG = + LoggerFactory.getLogger(MultiTableStreamWriteOperatorCoordinator.class); + + /** + * A custom coordination request that includes the TableId to request an instant for a specific + * table. + */ + public static class MultiTableInstantTimeRequest implements CoordinationRequest, Serializable { + private static final long serialVersionUID = 1L; + private final long checkpointId; + private final TableId tableId; + + public MultiTableInstantTimeRequest(long checkpointId, TableId tableId) { + this.checkpointId = checkpointId; + this.tableId = tableId; + } + + public long getCheckpointId() { + return checkpointId; + } + + public TableId getTableId() { + return tableId; + } + } + + /** + * Encapsulates all state and resources for a single table. This simplifies management by + * grouping related objects, making the coordinator logic cleaner and less prone to errors. + */ + private static class TableContext implements Serializable { + private static final long serialVersionUID = 1L; + + final transient HoodieFlinkWriteClient<?> writeClient; + final EventBuffers eventBuffers; + final TableState tableState; + final String tablePath; + + TableContext( + HoodieFlinkWriteClient<?> writeClient, + EventBuffers eventBuffers, + TableState tableState, + String tablePath) { + this.writeClient = writeClient; + this.eventBuffers = eventBuffers; + this.tableState = tableState; + this.tablePath = tablePath; + } + + void close() { + if (writeClient != null) { + try { + writeClient.close(); + } catch (Exception e) { + LOG.error("Error closing write client for table path: {}", tablePath, e); + } + } + } + } + + /** A container for table-specific configuration and state. */ + private static class TableState implements Serializable { + private static final long serialVersionUID = 1L; + final String commitAction; + final boolean isOverwrite; + final WriteOperationType operationType; + final boolean scheduleCompaction; + final boolean scheduleClustering; + final boolean isDeltaTimeCompaction; + + // Event-driven compaction tracking - tracks actual write activity + long commitsSinceLastCompaction = 0; + // For MOR tables, track log file growth + long totalLogBytesWritten = 0; + + final int commitsThreshold; + + TableState(Configuration conf) { + this.operationType = + WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); + this.commitAction = + CommitUtils.getCommitActionType( + this.operationType, + HoodieTableType.valueOf( + conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase())); + this.isOverwrite = WriteOperationType.isOverwrite(this.operationType); + this.scheduleCompaction = OptionsResolver.needsScheduleCompaction(conf); + this.scheduleClustering = OptionsResolver.needsScheduleClustering(conf); + this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf); + this.commitsThreshold = conf.get(COMPACTION_DELTA_COMMITS); + } + + /** + * Updates compaction metrics based on write statuses. Skips empty commits where no actual + * data was written. + * + * @param writeStatuses The write statuses from the latest commit + * @return true if this commit had actual writes, false if it was empty + */ + boolean updateCompactionMetrics(List<WriteStatus> writeStatuses) { + if (writeStatuses == null || writeStatuses.isEmpty()) { + LOG.debug("No write statuses - skipping compaction metric update"); + return false; + } + + // Check if any actual writes occurred (skip empty commits) + long totalWrites = + writeStatuses.stream() + .map(WriteStatus::getStat) + .filter(stat -> stat != null) + .mapToLong(HoodieWriteStat::getNumWrites) + .sum(); + + if (totalWrites == 0) { + LOG.debug( + "Empty commit detected (numWrites=0) - skipping compaction metric update"); + return false; + } + + // Track log file bytes written (for MOR tables) + long bytesWritten = + writeStatuses.stream() + .map(WriteStatus::getStat) + .filter(stat -> stat != null) + .mapToLong(HoodieWriteStat::getTotalWriteBytes) + .sum(); + + commitsSinceLastCompaction++; + totalLogBytesWritten += bytesWritten; + + LOG.debug( + "Updated compaction metrics: commits={}, bytes={}", + commitsSinceLastCompaction, + totalLogBytesWritten); + return true; + } + + /** Resets compaction metrics after compaction is scheduled. */ + void resetCompactionMetrics() { + commitsSinceLastCompaction = 0; + totalLogBytesWritten = 0; + } + + /** + * Determines if compaction should be triggered based on write activity. Only triggers for + * MOR tables with actual data writes. + * + * @return true if compaction should be scheduled + */ + boolean shouldTriggerCompaction() { + // Only trigger for MOR tables (DELTA_COMMIT means log files) + if (!commitAction.equals(HoodieTimeline.DELTA_COMMIT_ACTION)) { + return false; + } + + return commitsSinceLastCompaction >= commitsThreshold; + } + } + + /** The base Flink configuration. */ + private final Configuration baseConfig; + + /** + * A single, unified map holding the context for each managed table. The key is the {@link + * TableId}, providing a centralized place for all per-table resources. + */ + private final Map<TableId, TableContext> tableContexts = new ConcurrentHashMap<>(); + + /** A reverse lookup map from table path to TableId for efficient event routing. */ + private final Map<String, TableId> pathToTableId = new ConcurrentHashMap<>(); + + /** Cache of schemas per table for config creation. */ + private final Map<TableId, Schema> tableSchemas = new ConcurrentHashMap<>(); + + /** + * Gateways for sending events to sub-tasks. This field is necessary because the parent's + * `gateways` array is private and not initialized if we don't call super.start(). + */ + private transient SubtaskGateway[] gateways; + + /** A single-thread executor to handle instant time requests, mimicking the parent behavior. */ + private transient NonThrownExecutor instantRequestExecutor; + + public MultiTableStreamWriteOperatorCoordinator(Configuration conf, Context context) { + super(conf, context); + this.baseConfig = conf; + LOG.info( + "MultiTableStreamWriteOperatorCoordinator initialized for operator: {} with config: {}", + context.getOperatorId(), + baseConfig); + } + + @Override + public void start() throws Exception { + // Hadoop's FileSystem API uses Java's ServiceLoader to find implementations for + // URI schemes (like 'file://'). The ServiceLoader relies on the thread's context + // classloader. The parent class sets this, but our overridden start() method must + // do so as well to ensure file system implementations can be found. + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + + // Initialize the executor service, which is a protected field in the parent class. + // This logic is borrowed from the parent's start() method as we cannot call super.start(). + this.executor = + NonThrownExecutor.builder(LOG) + .threadFactory( + new ExplicitClassloaderThreadFactory( + "multi-table-coord-event-handler", + context.getUserCodeClassloader())) + .exceptionHook( + (errMsg, t) -> this.context.failJob(new HoodieException(errMsg, t))) + .waitForTasksFinish(true) + .build(); + + // Executor for handling instant requests. + this.instantRequestExecutor = + NonThrownExecutor.builder(LOG) + .threadFactory( + new ExplicitClassloaderThreadFactory( + "multi-table-instant-request", + context.getUserCodeClassloader())) + .exceptionHook( + (errMsg, t) -> this.context.failJob(new HoodieException(errMsg, t))) + .build(); + + // Initialize the gateways array to avoid NullPointerException when subtasks are ready. + this.gateways = new SubtaskGateway[context.currentParallelism()]; + + // Re-initialize transient fields after deserialization from a Flink checkpoint. + // When the coordinator is restored, the `tableContexts` map is deserialized, but all + // `writeClient` fields within it will be null because they are transient. + for (Map.Entry<TableId, TableContext> entry : tableContexts.entrySet()) { + TableId tableId = entry.getKey(); + TableContext oldContext = entry.getValue(); + + try { + Configuration tableConfig = createTableSpecificConfig(tableId); + // Ensure the table's filesystem structure exists before creating a client. + StreamerUtil.initTableIfNotExists(tableConfig); + HoodieFlinkWriteClient<?> writeClient = + FlinkWriteClients.createWriteClient(tableConfig); + + // Replace the old context (with a null client) with a new one containing the live + // client. + tableContexts.put( + tableId, + new TableContext( + writeClient, + oldContext.eventBuffers, + oldContext.tableState, + oldContext.tablePath)); + LOG.info( + "Successfully re-initialized write client for recovered table: {}", + tableId); + } catch (Exception e) { + LOG.error( + "Failed to re-initialize write client for recovered table: {}", tableId, e); + context.failJob(e); + return; // Exit if initialization fails for any table + } + } + } + + @Override + public CompletableFuture<CoordinationResponse> handleCoordinationRequest( + CoordinationRequest request) { + if (request instanceof MultiTableInstantTimeRequest) { + CompletableFuture<CoordinationResponse> future = new CompletableFuture<>(); + instantRequestExecutor.execute( + () -> { + MultiTableInstantTimeRequest instantRequest = + (MultiTableInstantTimeRequest) request; + TableId tableId = instantRequest.getTableId(); + long checkpointId = instantRequest.getCheckpointId(); + + TableContext tableContext = tableContexts.get(tableId); + if (tableContext == null) { + String errorMsg = + String.format( + "Received instant request for unknown table %s. The sink function should send a CreateTableEvent first.", + tableId); + LOG.error(errorMsg); + future.completeExceptionally(new IllegalStateException(errorMsg)); + return; + } + + Pair<String, WriteMetadataEvent[]> instantAndBuffer = + tableContext.eventBuffers.getInstantAndEventBuffer(checkpointId); + final String instantTime; + + if (instantAndBuffer == null) { + // No instant yet for this checkpoint, create a new one. + instantTime = startInstantForTable(tableContext); + tableContext.eventBuffers.initNewEventBuffer( + checkpointId, instantTime, context.currentParallelism()); + LOG.info( + "Created new instant [{}] for table [{}] at checkpoint [{}].", + instantTime, + tableId, + checkpointId); + } else { + // Instant already exists for this checkpoint, reuse it. + instantTime = instantAndBuffer.getLeft(); + LOG.info( + "Reusing instant [{}] for table [{}] at checkpoint [{}].", + instantTime, + tableId, + checkpointId); + } + future.complete( + CoordinationResponseSerDe.wrap( + Correspondent.InstantTimeResponse.getInstance( + instantTime))); + }, + "handling instant time request for checkpoint %d", + ((MultiTableInstantTimeRequest) request).getCheckpointId()); + return future; + } else { + LOG.warn("Received an unknown coordination request: {}", request.getClass().getName()); + return super.handleCoordinationRequest(request); + } + } + + private String startInstantForTable(TableContext tableContext) { + HoodieFlinkWriteClient<?> writeClient = tableContext.writeClient; + TableState tableState = tableContext.tableState; + HoodieTableMetaClient metaClient = writeClient.getHoodieTable().getMetaClient(); + + metaClient.reloadActiveTimeline(); + final String newInstant = writeClient.startCommit(tableState.commitAction, metaClient); + metaClient + .getActiveTimeline() + .transitionRequestedToInflight(tableState.commitAction, newInstant); + return newInstant; + } + + @Override + public void handleEventFromOperator( + int subtask, int attemptNumber, OperatorEvent operatorEvent) { + executor.execute( + () -> { + if (operatorEvent instanceof CreateTableOperatorEvent) { + handleCreateTableEvent((CreateTableOperatorEvent) operatorEvent); + } else if (operatorEvent instanceof SchemaChangeOperatorEvent) { + handleSchemaChangeEvent((SchemaChangeOperatorEvent) operatorEvent); + } else if (operatorEvent instanceof EnhancedWriteMetadataEvent) { + handleEnhancedWriteMetadataEvent( + (EnhancedWriteMetadataEvent) operatorEvent); + } else { + LOG.warn( + "Received an unhandled or non-enhanced OperatorEvent: {}", + operatorEvent); + } + }, + "handling operator event %s", + operatorEvent); + } + + private void handleCreateTableEvent(CreateTableOperatorEvent createTableOperatorEvent) { + CreateTableEvent event = createTableOperatorEvent.getCreateTableEvent(); + TableId tableId = event.tableId(); + + // Store the schema for this table + tableSchemas.put(tableId, event.getSchema()); + LOG.info( + "Cached schema for table {}: {} columns", + tableId, + event.getSchema().getColumnCount()); + + tableContexts.computeIfAbsent( + tableId, + tId -> { + LOG.info("New table detected: {}. Initializing Hudi resources.", tId); + try { + Configuration tableConfig = createTableSpecificConfig(tId); + String tablePath = tableConfig.getString(FlinkOptions.PATH); + pathToTableId.put(tablePath, tId); + + // Create physical directory for Hudi table before initializing + createHudiTablePath(tableConfig); + + StreamerUtil.initTableIfNotExists(tableConfig); + HoodieFlinkWriteClient<?> writeClient = + FlinkWriteClients.createWriteClient(tableConfig); + TableState tableState = new TableState(tableConfig); + EventBuffers eventBuffers = EventBuffers.getInstance(tableConfig); + + LOG.info( + "Successfully initialized resources for table: {} at path: {}", + tId, + tablePath); + return new TableContext(writeClient, eventBuffers, tableState, tablePath); + } catch (Exception e) { + LOG.error("Failed to initialize Hudi table resources for: {}", tId, e); + context.failJob( + new HoodieException( + "Failed to initialize Hudi writer for table " + tId, e)); + return null; + } + }); + } + + /** + * Handles schema change events from the sink functions. Updates the cached schema and recreates + * the write client to ensure it uses the new schema. + * + * @param event The schema change event containing the table ID and new schema + */ + private void handleSchemaChangeEvent(SchemaChangeOperatorEvent event) { + TableId tableId = event.getTableId(); + Schema newSchema = event.getNewSchema(); + + LOG.info( + "Received schema change event for table {}: {} columns", + tableId, + newSchema.getColumnCount()); + + // Update the cached schema + tableSchemas.put(tableId, newSchema); + LOG.info("Updated coordinator's schema cache for table: {}", tableId); + + // Get the existing table context + TableContext oldContext = tableContexts.get(tableId); + if (oldContext == null) { + LOG.warn( + "Received schema change for unknown table: {}. Skipping write client update.", + tableId); + return; + } + + try { + // Close the old write client + if (oldContext.writeClient != null) { + oldContext.writeClient.close(); + LOG.info("Closed old write client for table: {}", tableId); + } + + // Create new configuration with updated schema + Configuration tableConfig = createTableSpecificConfig(tableId); + + // Create new write client with updated schema + HoodieFlinkWriteClient<?> newWriteClient = + FlinkWriteClients.createWriteClient(tableConfig); + LOG.info("Created new write client with updated schema for table: {}", tableId); + + // Update the table context with the new write client + // Keep the same eventBuffers, tableState, and tablePath + TableContext newContext = + new TableContext( + newWriteClient, + oldContext.eventBuffers, + oldContext.tableState, + oldContext.tablePath); + tableContexts.put(tableId, newContext); + + LOG.info("Successfully updated write client for table {} after schema change", tableId); + } catch (Exception e) { + LOG.error("Failed to update write client for table {} after schema change", tableId, e); + context.failJob( + new HoodieException( + "Failed to update write client for table " + + tableId + + " after schema change", + e)); + } + } + + private void handleEnhancedWriteMetadataEvent(EnhancedWriteMetadataEvent enhancedEvent) { + String tablePath = enhancedEvent.getTablePath(); + WriteMetadataEvent event = enhancedEvent.getOriginalEvent(); + TableId tableId = pathToTableId.get(tablePath); + + if (tableId == null) { + LOG.warn("No tableId found for path: {}. Cannot process event.", tablePath); + return; Review Comment: this is an unexpected case, should also fail the job here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
