cshuo commented on code in PR #4164:
URL: https://github.com/apache/flink-cdc/pull/4164#discussion_r2501924107


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/coordinator/MultiTableStreamWriteOperatorCoordinator.java:
##########
@@ -0,0 +1,973 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.hudi.sink.coordinator;
+
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import 
org.apache.flink.cdc.connectors.hudi.sink.event.CreateTableOperatorEvent;
+import 
org.apache.flink.cdc.connectors.hudi.sink.event.EnhancedWriteMetadataEvent;
+import 
org.apache.flink.cdc.connectors.hudi.sink.event.SchemaChangeOperatorEvent;
+import org.apache.flink.cdc.connectors.hudi.sink.util.RowDataUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SerializationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.event.Correspondent;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.utils.CoordinationResponseSerDe;
+import org.apache.hudi.sink.utils.EventBuffers;
+import org.apache.hudi.sink.utils.ExplicitClassloaderThreadFactory;
+import org.apache.hudi.sink.utils.NonThrownExecutor;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.ClusteringUtil;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.configuration.FlinkOptions.COMPACTION_DELTA_COMMITS;
+
+/**
+ * A custom OperatorCoordinator that manages Hudi writes for multiple tables.
+ *
+ * <p>This coordinator extends the default {@link 
StreamWriteOperatorCoordinator}. The parent class
+ * is designed for a single destination table, so its core logic (e.g., for 
commits and
+ * checkpointing) cannot be reused directly for a multi-table sink.
+ *
+ * <p>Therefore, this implementation overrides the essential lifecycle methods 
to manage a
+ * collection of per-table resources. It dynamically creates and manages a 
dedicated {@link
+ * HoodieFlinkWriteClient}, {@link EventBuffers}, and timeline for each table 
that appears in the
+ * upstream CDC data.
+ */
+public class MultiTableStreamWriteOperatorCoordinator extends 
StreamWriteOperatorCoordinator {
+
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(MultiTableStreamWriteOperatorCoordinator.class);
+
+    /**
+     * A custom coordination request that includes the TableId to request an 
instant for a specific
+     * table.
+     */
+    public static class MultiTableInstantTimeRequest implements 
CoordinationRequest, Serializable {
+        private static final long serialVersionUID = 1L;
+        private final long checkpointId;
+        private final TableId tableId;
+
+        public MultiTableInstantTimeRequest(long checkpointId, TableId 
tableId) {
+            this.checkpointId = checkpointId;
+            this.tableId = tableId;
+        }
+
+        public long getCheckpointId() {
+            return checkpointId;
+        }
+
+        public TableId getTableId() {
+            return tableId;
+        }
+    }
+
+    /**
+     * Encapsulates all state and resources for a single table. This 
simplifies management by
+     * grouping related objects, making the coordinator logic cleaner and less 
prone to errors.
+     */
+    private static class TableContext implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        final transient HoodieFlinkWriteClient<?> writeClient;
+        final EventBuffers eventBuffers;
+        final TableState tableState;
+        final String tablePath;
+
+        TableContext(
+                HoodieFlinkWriteClient<?> writeClient,
+                EventBuffers eventBuffers,
+                TableState tableState,
+                String tablePath) {
+            this.writeClient = writeClient;
+            this.eventBuffers = eventBuffers;
+            this.tableState = tableState;
+            this.tablePath = tablePath;
+        }
+
+        void close() {
+            if (writeClient != null) {
+                try {
+                    writeClient.close();
+                } catch (Exception e) {
+                    LOG.error("Error closing write client for table path: {}", 
tablePath, e);
+                }
+            }
+        }
+    }
+
+    /** A container for table-specific configuration and state. */
+    private static class TableState implements Serializable {
+        private static final long serialVersionUID = 1L;
+        final String commitAction;
+        final boolean isOverwrite;
+        final WriteOperationType operationType;
+        final boolean scheduleCompaction;
+        final boolean scheduleClustering;
+        final boolean isDeltaTimeCompaction;
+
+        // Event-driven compaction tracking - tracks actual write activity
+        long commitsSinceLastCompaction = 0;
+        // For MOR tables, track log file growth
+        long totalLogBytesWritten = 0;
+
+        final int commitsThreshold;
+
+        TableState(Configuration conf) {
+            this.operationType =
+                    
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
+            this.commitAction =
+                    CommitUtils.getCommitActionType(
+                            this.operationType,
+                            HoodieTableType.valueOf(
+                                    
conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase()));
+            this.isOverwrite = 
WriteOperationType.isOverwrite(this.operationType);
+            this.scheduleCompaction = 
OptionsResolver.needsScheduleCompaction(conf);
+            this.scheduleClustering = 
OptionsResolver.needsScheduleClustering(conf);
+            this.isDeltaTimeCompaction = 
OptionsResolver.isDeltaTimeCompaction(conf);
+            this.commitsThreshold = conf.get(COMPACTION_DELTA_COMMITS);
+        }
+
+        /**
+         * Updates compaction metrics based on write statuses. Skips empty 
commits where no actual
+         * data was written.
+         *
+         * @param writeStatuses The write statuses from the latest commit
+         * @return true if this commit had actual writes, false if it was empty
+         */
+        boolean updateCompactionMetrics(List<WriteStatus> writeStatuses) {
+            if (writeStatuses == null || writeStatuses.isEmpty()) {
+                LOG.debug("No write statuses - skipping compaction metric 
update");
+                return false;
+            }
+
+            // Check if any actual writes occurred (skip empty commits)
+            long totalWrites =
+                    writeStatuses.stream()
+                            .map(WriteStatus::getStat)
+                            .filter(stat -> stat != null)
+                            .mapToLong(HoodieWriteStat::getNumWrites)
+                            .sum();
+
+            if (totalWrites == 0) {
+                LOG.debug(
+                        "Empty commit detected (numWrites=0) - skipping 
compaction metric update");
+                return false;
+            }
+
+            // Track log file bytes written (for MOR tables)
+            long bytesWritten =
+                    writeStatuses.stream()
+                            .map(WriteStatus::getStat)
+                            .filter(stat -> stat != null)
+                            .mapToLong(HoodieWriteStat::getTotalWriteBytes)
+                            .sum();
+
+            commitsSinceLastCompaction++;
+            totalLogBytesWritten += bytesWritten;
+
+            LOG.debug(
+                    "Updated compaction metrics: commits={}, bytes={}",
+                    commitsSinceLastCompaction,
+                    totalLogBytesWritten);
+            return true;
+        }
+
+        /** Resets compaction metrics after compaction is scheduled. */
+        void resetCompactionMetrics() {
+            commitsSinceLastCompaction = 0;
+            totalLogBytesWritten = 0;
+        }
+
+        /**
+         * Determines if compaction should be triggered based on write 
activity. Only triggers for
+         * MOR tables with actual data writes.
+         *
+         * @return true if compaction should be scheduled
+         */
+        boolean shouldTriggerCompaction() {
+            // Only trigger for MOR tables (DELTA_COMMIT means log files)
+            if (!commitAction.equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
+                return false;
+            }
+
+            return commitsSinceLastCompaction >= commitsThreshold;
+        }
+    }
+
+    /** The base Flink configuration. */
+    private final Configuration baseConfig;
+
+    /**
+     * A single, unified map holding the context for each managed table. The 
key is the {@link
+     * TableId}, providing a centralized place for all per-table resources.
+     */
+    private final Map<TableId, TableContext> tableContexts = new 
ConcurrentHashMap<>();
+
+    /** A reverse lookup map from table path to TableId for efficient event 
routing. */
+    private final Map<String, TableId> pathToTableId = new 
ConcurrentHashMap<>();
+
+    /** Cache of schemas per table for config creation. */
+    private final Map<TableId, Schema> tableSchemas = new 
ConcurrentHashMap<>();
+
+    /**
+     * Gateways for sending events to sub-tasks. This field is necessary 
because the parent's
+     * `gateways` array is private and not initialized if we don't call 
super.start().
+     */
+    private transient SubtaskGateway[] gateways;
+
+    /** A single-thread executor to handle instant time requests, mimicking 
the parent behavior. */
+    private transient NonThrownExecutor instantRequestExecutor;
+
+    public MultiTableStreamWriteOperatorCoordinator(Configuration conf, 
Context context) {
+        super(conf, context);
+        this.baseConfig = conf;
+        LOG.info(
+                "MultiTableStreamWriteOperatorCoordinator initialized for 
operator: {} with config: {}",
+                context.getOperatorId(),
+                baseConfig);
+    }
+
+    @Override
+    public void start() throws Exception {
+        // Hadoop's FileSystem API uses Java's ServiceLoader to find 
implementations for
+        // URI schemes (like 'file://'). The ServiceLoader relies on the 
thread's context
+        // classloader. The parent class sets this, but our overridden start() 
method must
+        // do so as well to ensure file system implementations can be found.
+        
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+        // Initialize the executor service, which is a protected field in the 
parent class.
+        // This logic is borrowed from the parent's start() method as we 
cannot call super.start().
+        this.executor =
+                NonThrownExecutor.builder(LOG)
+                        .threadFactory(
+                                new ExplicitClassloaderThreadFactory(
+                                        "multi-table-coord-event-handler",
+                                        context.getUserCodeClassloader()))
+                        .exceptionHook(
+                                (errMsg, t) -> this.context.failJob(new 
HoodieException(errMsg, t)))
+                        .waitForTasksFinish(true)
+                        .build();
+
+        // Executor for handling instant requests.
+        this.instantRequestExecutor =
+                NonThrownExecutor.builder(LOG)
+                        .threadFactory(
+                                new ExplicitClassloaderThreadFactory(
+                                        "multi-table-instant-request",
+                                        context.getUserCodeClassloader()))
+                        .exceptionHook(
+                                (errMsg, t) -> this.context.failJob(new 
HoodieException(errMsg, t)))
+                        .build();
+
+        // Initialize the gateways array to avoid NullPointerException when 
subtasks are ready.
+        this.gateways = new SubtaskGateway[context.currentParallelism()];
+
+        // Re-initialize transient fields after deserialization from a Flink 
checkpoint.
+        // When the coordinator is restored, the `tableContexts` map is 
deserialized, but all
+        // `writeClient` fields within it will be null because they are 
transient.
+        for (Map.Entry<TableId, TableContext> entry : 
tableContexts.entrySet()) {
+            TableId tableId = entry.getKey();
+            TableContext oldContext = entry.getValue();
+
+            try {
+                Configuration tableConfig = createTableSpecificConfig(tableId);
+                // Ensure the table's filesystem structure exists before 
creating a client.
+                StreamerUtil.initTableIfNotExists(tableConfig);
+                HoodieFlinkWriteClient<?> writeClient =
+                        FlinkWriteClients.createWriteClient(tableConfig);
+
+                // Replace the old context (with a null client) with a new one 
containing the live
+                // client.
+                tableContexts.put(
+                        tableId,
+                        new TableContext(
+                                writeClient,
+                                oldContext.eventBuffers,
+                                oldContext.tableState,
+                                oldContext.tablePath));
+                LOG.info(
+                        "Successfully re-initialized write client for 
recovered table: {}",
+                        tableId);
+            } catch (Exception e) {
+                LOG.error(
+                        "Failed to re-initialize write client for recovered 
table: {}", tableId, e);
+                context.failJob(e);
+                return; // Exit if initialization fails for any table
+            }
+        }
+    }
+
+    @Override
+    public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
+            CoordinationRequest request) {
+        if (request instanceof MultiTableInstantTimeRequest) {
+            CompletableFuture<CoordinationResponse> future = new 
CompletableFuture<>();
+            instantRequestExecutor.execute(
+                    () -> {
+                        MultiTableInstantTimeRequest instantRequest =
+                                (MultiTableInstantTimeRequest) request;
+                        TableId tableId = instantRequest.getTableId();
+                        long checkpointId = instantRequest.getCheckpointId();
+
+                        TableContext tableContext = tableContexts.get(tableId);
+                        if (tableContext == null) {
+                            String errorMsg =
+                                    String.format(
+                                            "Received instant request for 
unknown table %s. The sink function should send a CreateTableEvent first.",
+                                            tableId);
+                            LOG.error(errorMsg);
+                            future.completeExceptionally(new 
IllegalStateException(errorMsg));
+                            return;
+                        }
+
+                        Pair<String, WriteMetadataEvent[]> instantAndBuffer =
+                                
tableContext.eventBuffers.getInstantAndEventBuffer(checkpointId);
+                        final String instantTime;
+
+                        if (instantAndBuffer == null) {
+                            // No instant yet for this checkpoint, create a 
new one.
+                            instantTime = startInstantForTable(tableContext);
+                            tableContext.eventBuffers.initNewEventBuffer(
+                                    checkpointId, instantTime, 
context.currentParallelism());
+                            LOG.info(
+                                    "Created new instant [{}] for table [{}] 
at checkpoint [{}].",
+                                    instantTime,
+                                    tableId,
+                                    checkpointId);
+                        } else {
+                            // Instant already exists for this checkpoint, 
reuse it.
+                            instantTime = instantAndBuffer.getLeft();
+                            LOG.info(
+                                    "Reusing instant [{}] for table [{}] at 
checkpoint [{}].",
+                                    instantTime,
+                                    tableId,
+                                    checkpointId);
+                        }
+                        future.complete(
+                                CoordinationResponseSerDe.wrap(
+                                        
Correspondent.InstantTimeResponse.getInstance(
+                                                instantTime)));
+                    },
+                    "handling instant time request for checkpoint %d",
+                    ((MultiTableInstantTimeRequest) 
request).getCheckpointId());
+            return future;
+        } else {
+            LOG.warn("Received an unknown coordination request: {}", 
request.getClass().getName());
+            return super.handleCoordinationRequest(request);
+        }
+    }
+
+    private String startInstantForTable(TableContext tableContext) {
+        HoodieFlinkWriteClient<?> writeClient = tableContext.writeClient;
+        TableState tableState = tableContext.tableState;
+        HoodieTableMetaClient metaClient = 
writeClient.getHoodieTable().getMetaClient();
+
+        metaClient.reloadActiveTimeline();
+        final String newInstant = 
writeClient.startCommit(tableState.commitAction, metaClient);
+        metaClient
+                .getActiveTimeline()
+                .transitionRequestedToInflight(tableState.commitAction, 
newInstant);
+        return newInstant;
+    }
+
+    @Override
+    public void handleEventFromOperator(
+            int subtask, int attemptNumber, OperatorEvent operatorEvent) {
+        executor.execute(
+                () -> {
+                    if (operatorEvent instanceof CreateTableOperatorEvent) {
+                        handleCreateTableEvent((CreateTableOperatorEvent) 
operatorEvent);
+                    } else if (operatorEvent instanceof 
SchemaChangeOperatorEvent) {
+                        handleSchemaChangeEvent((SchemaChangeOperatorEvent) 
operatorEvent);
+                    } else if (operatorEvent instanceof 
EnhancedWriteMetadataEvent) {
+                        handleEnhancedWriteMetadataEvent(
+                                (EnhancedWriteMetadataEvent) operatorEvent);
+                    } else {
+                        LOG.warn(
+                                "Received an unhandled or non-enhanced 
OperatorEvent: {}",
+                                operatorEvent);
+                    }
+                },
+                "handling operator event %s",
+                operatorEvent);
+    }
+
+    private void handleCreateTableEvent(CreateTableOperatorEvent 
createTableOperatorEvent) {
+        CreateTableEvent event = 
createTableOperatorEvent.getCreateTableEvent();
+        TableId tableId = event.tableId();
+
+        // Store the schema for this table
+        tableSchemas.put(tableId, event.getSchema());
+        LOG.info(
+                "Cached schema for table {}: {} columns",
+                tableId,
+                event.getSchema().getColumnCount());
+
+        tableContexts.computeIfAbsent(
+                tableId,
+                tId -> {
+                    LOG.info("New table detected: {}. Initializing Hudi 
resources.", tId);
+                    try {
+                        Configuration tableConfig = 
createTableSpecificConfig(tId);
+                        String tablePath = 
tableConfig.getString(FlinkOptions.PATH);
+                        pathToTableId.put(tablePath, tId);
+
+                        // Create physical directory for Hudi table before 
initializing
+                        createHudiTablePath(tableConfig);
+
+                        StreamerUtil.initTableIfNotExists(tableConfig);
+                        HoodieFlinkWriteClient<?> writeClient =
+                                
FlinkWriteClients.createWriteClient(tableConfig);
+                        TableState tableState = new TableState(tableConfig);
+                        EventBuffers eventBuffers = 
EventBuffers.getInstance(tableConfig);
+
+                        LOG.info(
+                                "Successfully initialized resources for table: 
{} at path: {}",
+                                tId,
+                                tablePath);
+                        return new TableContext(writeClient, eventBuffers, 
tableState, tablePath);
+                    } catch (Exception e) {
+                        LOG.error("Failed to initialize Hudi table resources 
for: {}", tId, e);
+                        context.failJob(
+                                new HoodieException(
+                                        "Failed to initialize Hudi writer for 
table " + tId, e));
+                        return null;
+                    }
+                });
+    }
+
+    /**
+     * Handles schema change events from the sink functions. Updates the 
cached schema and recreates
+     * the write client to ensure it uses the new schema.
+     *
+     * @param event The schema change event containing the table ID and new 
schema
+     */
+    private void handleSchemaChangeEvent(SchemaChangeOperatorEvent event) {
+        TableId tableId = event.getTableId();
+        Schema newSchema = event.getNewSchema();
+
+        LOG.info(
+                "Received schema change event for table {}: {} columns",
+                tableId,
+                newSchema.getColumnCount());
+
+        // Update the cached schema
+        tableSchemas.put(tableId, newSchema);
+        LOG.info("Updated coordinator's schema cache for table: {}", tableId);
+
+        // Get the existing table context
+        TableContext oldContext = tableContexts.get(tableId);
+        if (oldContext == null) {
+            LOG.warn(
+                    "Received schema change for unknown table: {}. Skipping 
write client update.",
+                    tableId);
+            return;
+        }
+
+        try {
+            // Close the old write client
+            if (oldContext.writeClient != null) {
+                oldContext.writeClient.close();
+                LOG.info("Closed old write client for table: {}", tableId);
+            }
+
+            // Create new configuration with updated schema
+            Configuration tableConfig = createTableSpecificConfig(tableId);
+
+            // Create new write client with updated schema
+            HoodieFlinkWriteClient<?> newWriteClient =
+                    FlinkWriteClients.createWriteClient(tableConfig);
+            LOG.info("Created new write client with updated schema for table: 
{}", tableId);
+
+            // Update the table context with the new write client
+            // Keep the same eventBuffers, tableState, and tablePath
+            TableContext newContext =
+                    new TableContext(
+                            newWriteClient,
+                            oldContext.eventBuffers,
+                            oldContext.tableState,
+                            oldContext.tablePath);
+            tableContexts.put(tableId, newContext);
+
+            LOG.info("Successfully updated write client for table {} after 
schema change", tableId);
+        } catch (Exception e) {
+            LOG.error("Failed to update write client for table {} after schema 
change", tableId, e);
+            context.failJob(
+                    new HoodieException(
+                            "Failed to update write client for table "
+                                    + tableId
+                                    + " after schema change",
+                            e));
+        }
+    }
+
+    private void handleEnhancedWriteMetadataEvent(EnhancedWriteMetadataEvent 
enhancedEvent) {
+        String tablePath = enhancedEvent.getTablePath();
+        WriteMetadataEvent event = enhancedEvent.getOriginalEvent();
+        TableId tableId = pathToTableId.get(tablePath);
+
+        if (tableId == null) {
+            LOG.warn("No tableId found for path: {}. Cannot process event.", 
tablePath);
+            return;

Review Comment:
   this is an unexpected case, should also fail the job here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to