cshuo commented on code in PR #4164:
URL: https://github.com/apache/flink-cdc/pull/4164#discussion_r2501426595


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/coordinator/MultiTableStreamWriteOperatorCoordinator.java:
##########
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.hudi.sink.coordinator;
+
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import 
org.apache.flink.cdc.connectors.hudi.sink.event.CreateTableOperatorEvent;
+import 
org.apache.flink.cdc.connectors.hudi.sink.event.EnhancedWriteMetadataEvent;
+import 
org.apache.flink.cdc.connectors.hudi.sink.event.SchemaChangeOperatorEvent;
+import org.apache.flink.cdc.connectors.hudi.sink.util.RowDataUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SerializationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.event.Correspondent;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.utils.CoordinationResponseSerDe;
+import org.apache.hudi.sink.utils.EventBuffers;
+import org.apache.hudi.sink.utils.ExplicitClassloaderThreadFactory;
+import org.apache.hudi.sink.utils.NonThrownExecutor;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.ClusteringUtil;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.configuration.FlinkOptions.COMPACTION_DELTA_COMMITS;
+
+/**
+ * A custom OperatorCoordinator that manages Hudi writes for multiple tables.
+ *
+ * <p>This coordinator extends the default {@link 
StreamWriteOperatorCoordinator}. The parent class
+ * is designed for a single destination table, so its core logic (e.g., for 
commits and
+ * checkpointing) cannot be reused directly for a multi-table sink.
+ *
+ * <p>Therefore, this implementation overrides the essential lifecycle methods 
to manage a
+ * collection of per-table resources. It dynamically creates and manages a 
dedicated {@link
+ * HoodieFlinkWriteClient}, {@link EventBuffers}, and timeline for each table 
that appears in the
+ * upstream CDC data.
+ */
+public class MultiTableStreamWriteOperatorCoordinator extends 
StreamWriteOperatorCoordinator {
+
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(MultiTableStreamWriteOperatorCoordinator.class);
+
+    /**
+     * A custom coordination request that includes the TableId to request an 
instant for a specific
+     * table.
+     */
+    public static class MultiTableInstantTimeRequest implements 
CoordinationRequest, Serializable {
+        private static final long serialVersionUID = 1L;
+        private final long checkpointId;
+        private final TableId tableId;
+
+        public MultiTableInstantTimeRequest(long checkpointId, TableId 
tableId) {
+            this.checkpointId = checkpointId;
+            this.tableId = tableId;
+        }
+
+        public long getCheckpointId() {
+            return checkpointId;
+        }
+
+        public TableId getTableId() {
+            return tableId;
+        }
+    }
+
+    /**
+     * Encapsulates all state and resources for a single table. This 
simplifies management by
+     * grouping related objects, making the coordinator logic cleaner and less 
prone to errors.
+     */
+    private static class TableContext implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        final transient HoodieFlinkWriteClient<?> writeClient;
+        final EventBuffers eventBuffers;
+        final TableState tableState;
+        final String tablePath;
+
+        TableContext(
+                HoodieFlinkWriteClient<?> writeClient,
+                EventBuffers eventBuffers,
+                TableState tableState,
+                String tablePath) {
+            this.writeClient = writeClient;
+            this.eventBuffers = eventBuffers;
+            this.tableState = tableState;
+            this.tablePath = tablePath;
+        }
+
+        void close() {
+            if (writeClient != null) {
+                try {
+                    writeClient.close();
+                } catch (Exception e) {
+                    LOG.error("Error closing write client for table path: {}", 
tablePath, e);
+                }
+            }
+        }
+    }
+
+    /** A container for table-specific configuration and state. */
+    private static class TableState implements Serializable {
+        private static final long serialVersionUID = 1L;
+        final String commitAction;
+        final boolean isOverwrite;
+        final WriteOperationType operationType;
+        final boolean scheduleCompaction;
+        final boolean scheduleClustering;
+        final boolean isDeltaTimeCompaction;
+
+        // Event-driven compaction tracking - tracks actual write activity
+        long commitsSinceLastCompaction = 0;
+        // For MOR tables, track log file growth
+        long totalLogBytesWritten = 0;
+
+        final int commitsThreshold;
+
+        TableState(Configuration conf) {
+            this.operationType =
+                    
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
+            this.commitAction =
+                    CommitUtils.getCommitActionType(
+                            this.operationType,
+                            HoodieTableType.valueOf(
+                                    
conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase()));
+            this.isOverwrite = 
WriteOperationType.isOverwrite(this.operationType);
+            this.scheduleCompaction = 
OptionsResolver.needsScheduleCompaction(conf);
+            this.scheduleClustering = 
OptionsResolver.needsScheduleClustering(conf);
+            this.isDeltaTimeCompaction = 
OptionsResolver.isDeltaTimeCompaction(conf);
+            this.commitsThreshold = conf.get(COMPACTION_DELTA_COMMITS);
+        }
+
+        /**
+         * Updates compaction metrics based on write statuses. Skips empty 
commits where no actual
+         * data was written.
+         *
+         * @param writeStatuses The write statuses from the latest commit
+         * @return true if this commit had actual writes, false if it was empty
+         */
+        boolean updateCompactionMetrics(List<WriteStatus> writeStatuses) {
+            if (writeStatuses == null || writeStatuses.isEmpty()) {
+                LOG.debug("No write statuses - skipping compaction metric 
update");
+                return false;
+            }
+
+            // Check if any actual writes occurred (skip empty commits)
+            long totalWrites =
+                    writeStatuses.stream()
+                            .map(WriteStatus::getStat)
+                            .filter(stat -> stat != null)
+                            .mapToLong(HoodieWriteStat::getNumWrites)
+                            .sum();
+
+            if (totalWrites == 0) {
+                LOG.debug(
+                        "Empty commit detected (numWrites=0) - skipping 
compaction metric update");
+                return false;
+            }
+
+            // Track log file bytes written (for MOR tables)
+            long bytesWritten =
+                    writeStatuses.stream()
+                            .map(WriteStatus::getStat)
+                            .filter(stat -> stat != null)
+                            .mapToLong(HoodieWriteStat::getTotalWriteBytes)
+                            .sum();
+
+            commitsSinceLastCompaction++;
+            totalLogBytesWritten += bytesWritten;
+
+            LOG.debug(
+                    "Updated compaction metrics: commits={}, bytes={}",
+                    commitsSinceLastCompaction,
+                    totalLogBytesWritten);
+            return true;
+        }
+
+        /** Resets compaction metrics after compaction is scheduled. */
+        void resetCompactionMetrics() {
+            commitsSinceLastCompaction = 0;
+            totalLogBytesWritten = 0;
+        }
+
+        /**
+         * Determines if compaction should be triggered based on write 
activity. Only triggers for
+         * MOR tables with actual data writes.
+         *
+         * @return true if compaction should be scheduled
+         */
+        boolean shouldTriggerCompaction() {
+            // Only trigger for MOR tables (DELTA_COMMIT means log files)
+            if (!commitAction.equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
+                return false;
+            }
+
+            return commitsSinceLastCompaction >= commitsThreshold;
+        }
+    }
+
+    /** The base Flink configuration. */
+    private final Configuration baseConfig;
+
+    /**
+     * A single, unified map holding the context for each managed table. The 
key is the {@link
+     * TableId}, providing a centralized place for all per-table resources.
+     */
+    private final Map<TableId, TableContext> tableContexts = new 
ConcurrentHashMap<>();
+
+    /** A reverse lookup map from table path to TableId for efficient event 
routing. */
+    private final Map<String, TableId> pathToTableId = new 
ConcurrentHashMap<>();
+
+    /** Cache of schemas per table for config creation. */
+    private final Map<TableId, Schema> tableSchemas = new 
ConcurrentHashMap<>();
+
+    /**
+     * Gateways for sending events to sub-tasks. This field is necessary 
because the parent's
+     * `gateways` array is private and not initialized if we don't call 
super.start().
+     */
+    private transient SubtaskGateway[] gateways;
+
+    /**
+     * A dedicated write client whose only job is to run the embedded timeline 
server. This ensures
+     * there is only one timeline server for the entire job.
+     */
+    private transient HoodieFlinkWriteClient<?> timelineServerClient;
+
+    /** A single-thread executor to handle instant time requests, mimicking 
the parent behavior. */
+    private transient NonThrownExecutor instantRequestExecutor;
+
+    public MultiTableStreamWriteOperatorCoordinator(Configuration conf, 
Context context) {
+        super(conf, context);
+        conf.setString("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
+        this.baseConfig = conf;
+        LOG.info(
+                "MultiTableStreamWriteOperatorCoordinator initialized for 
operator: {} with config: {}",
+                context.getOperatorId(),
+                baseConfig);
+    }
+
+    @Override
+    public void start() throws Exception {
+        // Hadoop's FileSystem API uses Java's ServiceLoader to find 
implementations for
+        // URI schemes (like 'file://'). The ServiceLoader relies on the 
thread's context
+        // classloader. The parent class sets this, but our overridden start() 
method must
+        // do so as well to ensure file system implementations can be found.
+        
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+        // Initialize the executor service, which is a protected field in the 
parent class.
+        // This logic is borrowed from the parent's start() method as we 
cannot call super.start().
+        this.executor =
+                NonThrownExecutor.builder(LOG)
+                        .threadFactory(
+                                new ExplicitClassloaderThreadFactory(
+                                        "multi-table-coord-event-handler",
+                                        context.getUserCodeClassloader()))
+                        .exceptionHook(
+                                (errMsg, t) -> this.context.failJob(new 
HoodieException(errMsg, t)))
+                        .waitForTasksFinish(true)
+                        .build();
+
+        // Executor for handling instant requests.
+        this.instantRequestExecutor =
+                NonThrownExecutor.builder(LOG)
+                        .threadFactory(
+                                new ExplicitClassloaderThreadFactory(
+                                        "multi-table-instant-request",
+                                        context.getUserCodeClassloader()))
+                        .exceptionHook(
+                                (errMsg, t) -> this.context.failJob(new 
HoodieException(errMsg, t)))
+                        .build();
+
+        // Initialize the gateways array to avoid NullPointerException when 
subtasks are ready.
+        this.gateways = new SubtaskGateway[context.currentParallelism()];
+
+        // Initialize a single write client for the coordinator path.
+        // Its primary role is to start and manage the embedded timeline 
server.
+        try {
+            // The baseConfig points to the dummy coordinator path.
+            // A .hoodie directory is required for the timeline server to 
start.
+            StreamerUtil.initTableIfNotExists(this.baseConfig);
+            this.timelineServerClient = 
FlinkWriteClients.createWriteClient(this.baseConfig);

Review Comment:
   I mean write client in writers are getting the ip/port conf of the timeline 
server through `FileSystemViewStorageConfig`, so each table should save view 
storage properties properly in coordinator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to