This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9ed0587d3 [flink] Introduce new option to control whether the
committer operator chained (#3300)
9ed0587d3 is described below
commit 9ed0587d3a9ec41c556103eacd1bb96084d1c984
Author: big face cat <[email protected]>
AuthorDate: Tue May 7 12:53:57 2024 +0800
[flink] Introduce new option to control whether the committer operator
chained (#3300)
---
.../generated/flink_connector_configuration.html | 6 ++++++
.../apache/paimon/flink/FlinkConnectorOptions.java | 7 +++++++
.../paimon/flink/sink/MultiTablesCompactorSink.java | 21 ++++++++++++++-------
3 files changed, 27 insertions(+), 7 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index f02046f31..b6befc9a1 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -152,6 +152,12 @@ under the License.
<td>MemorySize</td>
<td>Sink committer memory to control heap memory of global
committer.</td>
</tr>
+ <tr>
+ <td><h5>sink.committer-operator-chaining</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Allow sink committer and writer operator to be chained
together</td>
+ </tr>
<tr>
<td><h5>sink.cross-partition.managed-memory</h5></td>
<td style="word-wrap: break-word;">256 mb</td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 94371cb60..90eb9ce32 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -332,6 +332,13 @@ public class FlinkConnectorOptions {
.withDescription(
"Sink committer memory to control heap memory of
global committer.");
+ public static final ConfigOption<Boolean> SINK_COMMITTER_OPERATOR_CHAINING
=
+ ConfigOptions.key("sink.committer-operator-chaining")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Allow sink committer and writer operator to be
chained together");
+
public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
index d638e1f69..9547282ea 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
@@ -35,6 +35,7 @@ import
org.apache.flink.streaming.api.environment.CheckpointConfig;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.data.RowData;
@@ -42,6 +43,7 @@ import java.io.Serializable;
import java.util.Map;
import java.util.UUID;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
import static
org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
@@ -121,17 +123,22 @@ public class MultiTablesCompactorSink implements
Serializable {
if (streamingCheckpointEnabled) {
assertStreamingConfiguration(env);
}
-
+ CommitterOperator<MultiTableCommittable, WrappedManifestCommittable>
committerOperator =
+ new CommitterOperator<>(
+ streamingCheckpointEnabled,
+ false,
+ commitUser,
+ createCommitterFactory(),
+ createCommittableStateManager());
+
+ if (!options.get(SINK_COMMITTER_OPERATOR_CHAINING)) {
+ committerOperator.setChainingStrategy(ChainingStrategy.NEVER);
+ }
SingleOutputStreamOperator<?> committed =
written.transform(
GLOBAL_COMMITTER_NAME,
new MultiTableCommittableTypeInfo(),
- new CommitterOperator<>(
- streamingCheckpointEnabled,
- false,
- commitUser,
- createCommitterFactory(),
- createCommittableStateManager()))
+ committerOperator)
.setParallelism(1)
.setMaxParallelism(1);
return committed.addSink(new
DiscardingSink<>()).name("end").setParallelism(1);