This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c2fd476a8bb240879419e33fcf51a7ef5cce360a Author: voonhous <[email protected]> AuthorDate: Mon Oct 27 11:21:24 2025 +0800 refactor: Add required setter methods for Flink-CDC dev (#14150) --- .../org/apache/hudi/sink/StreamWriteOperatorCoordinator.java | 6 +++--- .../apache/hudi/sink/common/AbstractStreamWriteFunction.java | 10 +++++++++- .../main/java/org/apache/hudi/sink/event/Correspondent.java | 8 ++++++++ 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 04506a3127fd..68bb54f8a1ee 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -127,7 +127,7 @@ public class StreamWriteOperatorCoordinator /** * Coordinator context. */ - private final Context context; + protected final Context context; /** * Gateways for sending events to sub-tasks. @@ -167,12 +167,12 @@ public class StreamWriteOperatorCoordinator /** * A single-thread executor to handle all the write metadata events. */ - private NonThrownExecutor executor; + protected NonThrownExecutor executor; /** * A single-thread executor to handle the instant time request. */ - private NonThrownExecutor instantRequestExecutor; + protected NonThrownExecutor instantRequestExecutor; /** * A single-thread executor to handle asynchronous hive sync. diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 4d101d649f8b..deedfbba8b0e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -88,7 +88,7 @@ public abstract class AbstractStreamWriteFunction<I> /** * Correspondent to request the instant time. */ - private transient Correspondent correspondent; + protected transient Correspondent correspondent; /** * Gateway to send operator events to the operator coordinator. @@ -183,10 +183,18 @@ public abstract class AbstractStreamWriteFunction<I> this.correspondent = correspondent; } + public Correspondent getCorrespondent() { + return correspondent; + } + public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { this.eventGateway = operatorEventGateway; } + public OperatorEventGateway getOperatorEventGateway() { + return eventGateway; + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java index e23e7e851278..e4dd405bd40e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java @@ -71,6 +71,14 @@ public class Correspondent { } } + public OperatorID getOperatorID() { + return operatorID; + } + + public TaskOperatorEventGateway getGateway() { + return gateway; + } + /** * A request for instant time with a given checkpoint id. */
