This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ed0587d3 [flink] Introduce new option to control whether the 
committer operator chained (#3300)
9ed0587d3 is described below

commit 9ed0587d3a9ec41c556103eacd1bb96084d1c984
Author: big face cat <[email protected]>
AuthorDate: Tue May 7 12:53:57 2024 +0800

    [flink] Introduce new option to control whether the committer operator 
chained (#3300)
---
 .../generated/flink_connector_configuration.html    |  6 ++++++
 .../apache/paimon/flink/FlinkConnectorOptions.java  |  7 +++++++
 .../paimon/flink/sink/MultiTablesCompactorSink.java | 21 ++++++++++++++-------
 3 files changed, 27 insertions(+), 7 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index f02046f31..b6befc9a1 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -152,6 +152,12 @@ under the License.
             <td>MemorySize</td>
             <td>Sink committer memory to control heap memory of global 
committer.</td>
         </tr>
+        <tr>
+            <td><h5>sink.committer-operator-chaining</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Allow sink committer and writer operator to be chained 
together</td>
+        </tr>
         <tr>
             <td><h5>sink.cross-partition.managed-memory</h5></td>
             <td style="word-wrap: break-word;">256 mb</td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 94371cb60..90eb9ce32 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -332,6 +332,13 @@ public class FlinkConnectorOptions {
                     .withDescription(
                             "Sink committer memory to control heap memory of 
global committer.");
 
+    public static final ConfigOption<Boolean> SINK_COMMITTER_OPERATOR_CHAINING 
=
+            ConfigOptions.key("sink.committer-operator-chaining")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Allow sink committer and writer operator to be 
chained together");
+
     public static List<ConfigOption<?>> getOptions() {
         final Field[] fields = FlinkConnectorOptions.class.getFields();
         final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
index d638e1f69..9547282ea 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.streaming.api.environment.CheckpointConfig;
 import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.table.data.RowData;
 
@@ -42,6 +43,7 @@ import java.io.Serializable;
 import java.util.Map;
 import java.util.UUID;
 
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
 import static 
org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
@@ -121,17 +123,22 @@ public class MultiTablesCompactorSink implements 
Serializable {
         if (streamingCheckpointEnabled) {
             assertStreamingConfiguration(env);
         }
-
+        CommitterOperator<MultiTableCommittable, WrappedManifestCommittable> 
committerOperator =
+                new CommitterOperator<>(
+                        streamingCheckpointEnabled,
+                        false,
+                        commitUser,
+                        createCommitterFactory(),
+                        createCommittableStateManager());
+
+        if (!options.get(SINK_COMMITTER_OPERATOR_CHAINING)) {
+            committerOperator.setChainingStrategy(ChainingStrategy.NEVER);
+        }
         SingleOutputStreamOperator<?> committed =
                 written.transform(
                                 GLOBAL_COMMITTER_NAME,
                                 new MultiTableCommittableTypeInfo(),
-                                new CommitterOperator<>(
-                                        streamingCheckpointEnabled,
-                                        false,
-                                        commitUser,
-                                        createCommitterFactory(),
-                                        createCommittableStateManager()))
+                                committerOperator)
                         .setParallelism(1)
                         .setMaxParallelism(1);
         return committed.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);

Reply via email to