This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new bc0326cba8 [Feature][Connector-Paimon] Support dynamic bucket
splitting improves Paimon writing efficiency (#7335)
bc0326cba8 is described below
commit bc0326cba880b4ed0dd07ce0dba31929d2faa77b
Author: zhangdonghao <[email protected]>
AuthorDate: Fri Sep 20 16:04:16 2024 +0800
[Feature][Connector-Paimon] Support dynamic bucket splitting improves
Paimon writing efficiency (#7335)
---
docs/en/connector-v2/sink/Paimon.md | 47 ++-
docs/zh/connector-v2/sink/Paimon.md | 112 +++++-
.../api/sink/DefaultSinkWriterContext.java | 17 +-
.../org/apache/seatunnel/api/sink/SinkWriter.java | 5 +
.../api/sink/multitablesink/MultiTableSink.java | 7 +-
.../api/sink/multitablesink/SinkContextProxy.java | 10 +-
.../jdbc/internal/xa/SemanticXidGeneratorTest.java | 5 +-
.../seatunnel/paimon/sink/PaimonSinkWriter.java | 23 +-
.../paimon/sink/bucket/PaimonBucketAssigner.java | 85 +++++
.../sink/bucket/PaimonBucketAssignerTest.java | 91 +++++
.../flink/execution/SinkExecuteProcessor.java | 17 +-
.../starter/flink/execution/FlinkExecution.java | 1 -
.../flink/execution/SinkExecuteProcessor.java | 15 +-
.../spark/execution/SinkExecuteProcessor.java | 3 +-
.../spark/execution/SinkExecuteProcessor.java | 3 +-
.../jdbc/internal/xa/XaGroupOpsImplIT.java | 2 +-
.../paimon/PaimonSinkDynamicBucketIT.java | 420 +++++++++++++++++++++
.../e2e/connector/paimon/SimpleBucketIndex.java | 83 ++++
.../fake_cdc_to_dynamic_bucket_paimon_case.conf | 131 +++++++
.../fake_to_dynamic_bucket_paimon_case1.conf | 63 ++++
.../fake_to_dynamic_bucket_paimon_case2.conf | 53 +++
.../fake_to_dynamic_bucket_paimon_case3.conf | 53 +++
.../fake_to_dynamic_bucket_paimon_case4.conf | 82 ++++
.../fake_to_dynamic_bucket_paimon_case5.conf | 64 ++++
.../server/task/context/SinkWriterContext.java | 23 +-
.../engine/server/task/flow/SinkFlowLifeCycle.java | 4 +-
.../flink/sink/FlinkSinkWriterContext.java | 9 +-
.../translation/flink/sink/FlinkSink.java | 9 +-
.../flink/sink/FlinkSinkWriterContext.java | 9 +-
.../translation/spark/sink/SparkSink.java | 17 +-
.../translation/spark/sink/SparkSinkInjector.java | 13 +-
.../spark/sink/writer/SparkDataSourceWriter.java | 7 +-
.../spark/sink/writer/SparkDataWriterFactory.java | 7 +-
.../spark/sink/writer/SparkStreamWriter.java | 5 +-
.../spark/sink/SeaTunnelBatchWrite.java | 8 +-
.../translation/spark/sink/SeaTunnelSinkTable.java | 12 +-
.../translation/spark/sink/SparkSinkInjector.java | 14 +-
.../write/SeaTunnelSparkDataWriterFactory.java | 7 +-
.../spark/sink/write/SeaTunnelWrite.java | 9 +-
.../spark/sink/write/SeaTunnelWriteBuilder.java | 7 +-
.../translation/spark/sink/SparkSinkTest.java | 3 +-
41 files changed, 1477 insertions(+), 78 deletions(-)
diff --git a/docs/en/connector-v2/sink/Paimon.md
b/docs/en/connector-v2/sink/Paimon.md
index 57ba427430..bb4bb9d5d7 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -43,7 +43,7 @@ libfb303-xxx.jar
| data_save_mode | Enum | No | APPEND_DATA
| The data save mode
|
| paimon.table.primary-keys | String | No | -
| Default comma-separated list of columns (primary key) that identify a row
in tables.(Notice: The partition field needs to be included in the primary key
fields) |
| paimon.table.partition-keys | String | No | -
| Default comma-separated list of partition fields to use when creating
tables.
|
-| paimon.table.write-props | Map | No | -
| Properties passed through to paimon table initialization,
[reference](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions).
|
+| paimon.table.write-props | Map | No | -
| Properties passed through to paimon table initialization,
[reference](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions).
|
| paimon.hadoop.conf | Map | No | -
| Properties in hadoop conf
|
| paimon.hadoop.conf-path | String | No | -
| The specified loading path for the 'core-site.xml', 'hdfs-site.xml',
'hive-site.xml' files
|
@@ -241,6 +241,51 @@ sink {
}
```
+### Write to dynamic bucket table
+
+Single dynamic bucket table with write props of paimon,operates on the primary
key table and bucket is -1.
+
+#### core options
+
+Please
[reference](https://paimon.apache.org/docs/master/primary-key-table/data-distribution/#dynamic-bucket)
+
+| name | type | required | default values |
Description |
+|--------------------------------|------|----------|----------------|------------------------------------------------|
+| dynamic-bucket.target-row-num | long | yes | 2000000L | controls
the target row number for one bucket. |
+| dynamic-bucket.initial-buckets | int | no | | controls
the number of initialized bucket. |
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ Mysql-CDC {
+ base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+ username = "root"
+ password = "******"
+ table-names = ["seatunnel.role"]
+ }
+}
+
+sink {
+ Paimon {
+ catalog_name="seatunnel_test"
+ warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
+ database="seatunnel"
+ table="role"
+ paimon.table.write-props = {
+ bucket = -1
+ dynamic-bucket.target-row-num = 50000
+ }
+ paimon.table.partition-keys = "dt"
+ paimon.table.primary-keys = "pk_id,dt"
+ }
+}
+```
+
### Multiple table
#### example1
diff --git a/docs/zh/connector-v2/sink/Paimon.md
b/docs/zh/connector-v2/sink/Paimon.md
index 41fa395eb7..66f6738efc 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -30,22 +30,21 @@ libfb303-xxx.jar
## 连接器选项
-| 名称 | 类型 | 是否必须 | 默认值 | 描述
|
-|-----------------------------|-------|----------|------------------------------|------------------------------------------------------------------------------------------------------|
-| warehouse | 字符串 | 是 | - |
Paimon warehouse路径
|
-| catalog_type | 字符串 | 否 | filesystem |
Paimon的catalog类型,目前支持filesystem和hive
|
-| catalog_uri | 字符串 | 否 | - |
Paimon catalog的uri,仅当catalog_type为hive时需要配置
|
-| database | 字符串 | 是 | - |
数据库名称
|
-| table | 字符串 | 是 | - |
表名
|
-| hdfs_site_path | 字符串 | 否 | - |
hdfs-site.xml文件路径
|
-| schema_save_mode | 枚举 | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST
| Schema保存模式
|
-| data_save_mode | 枚举 | 否 | APPEND_DATA
| 数据保存模式
|
-| paimon.table.primary-keys | 字符串 | 否 | - |
主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中)
|
-| paimon.table.partition-keys | 字符串 | 否 | - |
分区字段列表,多字段使用逗号分隔
|
+| 名称 | 类型 | 是否必须 | 默认值 | 描述
|
+|-----------------------------|-------|----------|------------------------------|---------------------------------------------------------------------------------------------------|
+| warehouse | 字符串 | 是 | - |
Paimon warehouse路径
|
+| catalog_type | 字符串 | 否 | filesystem |
Paimon的catalog类型,目前支持filesystem和hive
|
+| catalog_uri | 字符串 | 否 | - |
Paimon catalog的uri,仅当catalog_type为hive时需要配置
|
+| database | 字符串 | 是 | - |
数据库名称
|
+| table | 字符串 | 是 | - |
表名
|
+| hdfs_site_path | 字符串 | 否 | - |
hdfs-site.xml文件路径
|
+| schema_save_mode | 枚举 | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST
| Schema保存模式
|
+| data_save_mode | 枚举 | 否 | APPEND_DATA
| 数据保存模式
|
+| paimon.table.primary-keys | 字符串 | 否 | - |
主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中)
|
+| paimon.table.partition-keys | 字符串 | 否 | - |
分区字段列表,多字段使用逗号分隔
|
| paimon.table.write-props | Map | 否 | -
| Paimon表初始化指定的属性,
[参考](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions)
|
-| paimon.hadoop.conf | Map | 否 | -
| Hadoop配置文件属性信息
|
-| paimon.hadoop.conf-path | 字符串 | 否 | - |
Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置
|
-
+| paimon.hadoop.conf | Map | 否 | -
| Hadoop配置文件属性信息
|
+| paimon.hadoop.conf-path | 字符串 | 否 | - |
Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置
|
## 示例
@@ -241,8 +240,53 @@ sink {
}
```
+### 动态分桶paimon单表
+
+只有在主键表并指定bucket = -1时才会生效
+
+####
核心参数:[参考官网](https://paimon.apache.org/docs/master/primary-key-table/data-distribution/#dynamic-bucket)
+
+| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
+|--------------------------------|------|------|----------|------------------|
+| dynamic-bucket.target-row-num | long | 是 | 2000000L | 控制一个bucket的写入的行数 |
+| dynamic-bucket.initial-buckets | int | 否 | | 控制初始化桶的数量 |
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ Mysql-CDC {
+ base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+ username = "root"
+ password = "******"
+ table-names = ["seatunnel.role"]
+ }
+}
+
+sink {
+ Paimon {
+ catalog_name="seatunnel_test"
+ warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
+ database="seatunnel"
+ table="role"
+ paimon.table.write-props = {
+ bucket = -1
+ dynamic-bucket.target-row-num = 50000
+ }
+ paimon.table.partition-keys = "dt"
+ paimon.table.primary-keys = "pk_id,dt"
+ }
+}
+```
+
### 多表
+#### 示例1
+
```hocon
env {
parallelism = 1
@@ -272,3 +316,41 @@ sink {
}
```
+#### 示例2
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ driver = oracle.jdbc.driver.OracleDriver
+ url = "jdbc:oracle:thin:@localhost:1521/XE"
+ user = testUser
+ password = testPassword
+
+ table_list = [
+ {
+ table_path = "TESTSCHEMA.TABLE_1"
+ },
+ {
+ table_path = "TESTSCHEMA.TABLE_2"
+ }
+ ]
+ }
+}
+
+transform {
+}
+
+sink {
+ Paimon {
+ catalog_name="seatunnel_test"
+ warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
+ database="${schema_name}_test"
+ table="${table_name}_test"
+ }
+}
+```
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
index 73af75f22c..74ae4a0eb4 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
@@ -25,18 +25,21 @@ import org.apache.seatunnel.api.event.EventListener;
/** The default {@link SinkWriter.Context} implement class. */
public class DefaultSinkWriterContext implements SinkWriter.Context {
private final int subtask;
+ private final int numberOfParallelSubtasks;
private final EventListener eventListener;
- public DefaultSinkWriterContext(int subtask) {
- this(subtask, new DefaultEventProcessor());
+ public DefaultSinkWriterContext(int subtask, int parallelism) {
+ this(subtask, parallelism, new DefaultEventProcessor());
}
- public DefaultSinkWriterContext(String jobId, int subtask) {
- this(subtask, new DefaultEventProcessor(jobId));
+ public DefaultSinkWriterContext(String jobId, int subtask, int
parallelism) {
+ this(subtask, parallelism, new DefaultEventProcessor(jobId));
}
- public DefaultSinkWriterContext(int subtask, EventListener eventListener) {
+ public DefaultSinkWriterContext(
+ int subtask, int numberOfParallelSubtasks, EventListener
eventListener) {
this.subtask = subtask;
+ this.numberOfParallelSubtasks = numberOfParallelSubtasks;
this.eventListener = eventListener;
}
@@ -45,6 +48,10 @@ public class DefaultSinkWriterContext implements
SinkWriter.Context {
return subtask;
}
+ public int getNumberOfParallelSubtasks() {
+ return numberOfParallelSubtasks;
+ }
+
@Override
public MetricsContext getMetricsContext() {
// TODO Waiting for Flink and Spark to implement MetricsContext
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 785f1065dd..4567e98cbf 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -92,6 +92,11 @@ public interface SinkWriter<T, CommitInfoT, StateT> {
/** @return The index of this subtask. */
int getIndexOfSubtask();
+ /** @return parallelism of this writer. */
+ default int getNumberOfParallelSubtasks() {
+ return 1;
+ }
+
/** @return metricsContext of this reader. */
MetricsContext getMetricsContext();
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
index 3f7f7fa9c6..3db7a8b7d2 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
@@ -71,7 +71,7 @@ public class MultiTableSink
int index = context.getIndexOfSubtask() * replicaNum + i;
writers.put(
SinkIdentifier.of(tableIdentifier, index),
- sink.createWriter(new SinkContextProxy(index,
context)));
+ sink.createWriter(new SinkContextProxy(index,
replicaNum, context)));
sinkWritersContext.put(SinkIdentifier.of(tableIdentifier,
index), context);
}
}
@@ -100,11 +100,12 @@ public class MultiTableSink
if (state.isEmpty()) {
writers.put(
sinkIdentifier,
- sink.createWriter(new SinkContextProxy(index,
context)));
+ sink.createWriter(new SinkContextProxy(index,
replicaNum, context)));
} else {
writers.put(
sinkIdentifier,
- sink.restoreWriter(new SinkContextProxy(index,
context), state));
+ sink.restoreWriter(
+ new SinkContextProxy(index, replicaNum,
context), state));
}
sinkWritersContext.put(SinkIdentifier.of(tableIdentifier,
index), context);
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java
index 3a97bb27bc..5f4bf75f6f 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java
@@ -25,10 +25,13 @@ public class SinkContextProxy implements SinkWriter.Context
{
private final int index;
+ private final int replicaNum;
+
private final SinkWriter.Context context;
- public SinkContextProxy(int index, SinkWriter.Context context) {
+ public SinkContextProxy(int index, int replicaNum, SinkWriter.Context
context) {
this.index = index;
+ this.replicaNum = replicaNum;
this.context = context;
}
@@ -37,6 +40,11 @@ public class SinkContextProxy implements SinkWriter.Context {
return index;
}
+ @Override
+ public int getNumberOfParallelSubtasks() {
+ return context.getNumberOfParallelSubtasks() * replicaNum;
+ }
+
@Override
public MetricsContext getMetricsContext() {
return context.getMetricsContext();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
index b47d4facd8..105d68d0af 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
@@ -45,11 +45,12 @@ class SemanticXidGeneratorTest {
}
void check(JobContext jobContext) {
- DefaultSinkWriterContext dc1 = new
DefaultSinkWriterContext(Integer.MAX_VALUE);
+ DefaultSinkWriterContext dc1 = new
DefaultSinkWriterContext(Integer.MAX_VALUE, 1);
Xid xid1 = xidGenerator.generateXid(jobContext, dc1,
System.currentTimeMillis());
Assertions.assertTrue(xidGenerator.belongsToSubtask(xid1, jobContext,
dc1));
Assertions.assertFalse(
- xidGenerator.belongsToSubtask(xid1, jobContext, new
DefaultSinkWriterContext(2)));
+ xidGenerator.belongsToSubtask(
+ xid1, jobContext, new DefaultSinkWriterContext(2, 1)));
Assertions.assertFalse(xidGenerator.belongsToSubtask(xid1, new
JobContext(), dc1));
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index acadf99990..d58ab65cf7 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -27,6 +27,7 @@ import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfi
import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssigner;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
@@ -34,6 +35,7 @@ import
org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
@@ -79,7 +81,11 @@ public class PaimonSinkWriter
private final JobContext jobContext;
- private TableSchema tableSchema;
+ private final TableSchema tableSchema;
+
+ private final PaimonBucketAssigner bucketAssigner;
+
+ private final boolean dynamicBucket;
public PaimonSinkWriter(
Context context,
@@ -97,6 +103,14 @@ public class PaimonSinkWriter
this.context = context;
this.jobContext = jobContext;
this.tableSchema = ((FileStoreTable) table).schema();
+ this.bucketAssigner =
+ new PaimonBucketAssigner(
+ table,
+ this.context.getNumberOfParallelSubtasks(),
+ this.context.getIndexOfSubtask());
+ BucketMode bucketMode = ((FileStoreTable) table).bucketMode();
+ this.dynamicBucket =
+ BucketMode.DYNAMIC == bucketMode || BucketMode.GLOBAL_DYNAMIC
== bucketMode;
PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
}
@@ -139,7 +153,12 @@ public class PaimonSinkWriter
try {
PaimonSecurityContext.runSecured(
() -> {
- tableWrite.write(rowData);
+ if (dynamicBucket) {
+ int bucket = bucketAssigner.assign(rowData);
+ tableWrite.write(rowData, bucket);
+ } else {
+ tableWrite.write(rowData);
+ }
return null;
});
} catch (Exception e) {
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
new file mode 100644
index 0000000000..4f5f681fff
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.crosspartition.IndexBootstrap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.index.SimpleHashBucketAssigner;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+
+import java.io.IOException;
+
+public class PaimonBucketAssigner {
+
+ private final RowPartitionKeyExtractor extractor;
+
+ private final Projection bucketKeyProjection;
+
+ private final SimpleHashBucketAssigner simpleHashBucketAssigner;
+
+ private final TableSchema schema;
+
+ public PaimonBucketAssigner(Table table, int numAssigners, int assignId) {
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ this.schema = fileStoreTable.schema();
+ this.extractor = new RowPartitionKeyExtractor(fileStoreTable.schema());
+ this.bucketKeyProjection =
+ CodeGenUtils.newProjection(
+ fileStoreTable.schema().logicalRowType(),
+
fileStoreTable.schema().projection(fileStoreTable.schema().bucketKeys()));
+ long dynamicBucketTargetRowNum =
+ ((FileStoreTable)
table).coreOptions().dynamicBucketTargetRowNum();
+ this.simpleHashBucketAssigner =
+ new SimpleHashBucketAssigner(numAssigners, assignId,
dynamicBucketTargetRowNum);
+ loadBucketIndex(fileStoreTable, numAssigners, assignId);
+ }
+
+ private void loadBucketIndex(FileStoreTable fileStoreTable, int
numAssigners, int assignId) {
+ IndexBootstrap indexBootstrap = new IndexBootstrap(fileStoreTable);
+ try (RecordReader<InternalRow> recordReader =
+ indexBootstrap.bootstrap(numAssigners, assignId)) {
+ RecordReaderIterator<InternalRow> readerIterator =
+ new RecordReaderIterator<>(recordReader);
+ while (readerIterator.hasNext()) {
+ InternalRow row = readerIterator.next();
+ assign(row);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public int assign(InternalRow rowData) {
+ int hash;
+ if (CollectionUtils.isEmpty(this.schema.bucketKeys())) {
+ hash = extractor.trimmedPrimaryKey(rowData).hashCode();
+ } else {
+ hash = bucketKeyProjection.apply(rowData).hashCode();
+ }
+ return Math.abs(
+
this.simpleHashBucketAssigner.assign(this.extractor.partition(rowData), hash));
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerTest.java
new file mode 100644
index 0000000000..fd7fdef816
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class PaimonBucketAssignerTest {
+
+ private Table table;
+ private static final String TABLE_NAME = "default_table";
+ private static final String DATABASE_NAME = "default_database";
+
+ @BeforeEach
+ public void before() throws Exception {
+ boolean isWindows =
+
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
+ Options options = new Options();
+ if (isWindows) {
+ options.set("warehouse", "C:/Users/" +
System.getProperty("user.name") + "/tmp/paimon");
+ } else {
+ options.set("warehouse", "file:///tmp/paimon");
+ }
+ Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(options));
+ catalog.createDatabase(DATABASE_NAME, true);
+ Identifier identifier = Identifier.create(DATABASE_NAME, TABLE_NAME);
+ if (!catalog.tableExists(identifier)) {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("id", DataTypes.INT(), "primary Key");
+ schemaBuilder.column("name", DataTypes.STRING(), "name");
+ schemaBuilder.primaryKey("id");
+ schemaBuilder.option("bucket", "-1");
+ schemaBuilder.option("dynamic-bucket.target-row-num", "20");
+ Schema schema = schemaBuilder.build();
+ catalog.createTable(identifier, schema, false);
+ }
+ table = catalog.getTable(identifier);
+ }
+
+ @Test
+ public void bucketAssigner() {
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ RowPartitionKeyExtractor keyExtractor =
+ new RowPartitionKeyExtractor(fileStoreTable.schema());
+ PaimonBucketAssigner paimonBucketAssigner = new
PaimonBucketAssigner(fileStoreTable, 1, 0);
+ Map<Integer, Integer> bucketInformation = new HashMap<>();
+ for (int i = 0; i < 50; i++) {
+ GenericRow row = GenericRow.of(i,
BinaryString.fromString(String.valueOf(i)));
+ int assign = paimonBucketAssigner.assign(row);
+ int hashCode = keyExtractor.trimmedPrimaryKey(row).hashCode();
+ bucketInformation.put(hashCode, assign);
+ }
+ List<Integer> bucketSize =
+
bucketInformation.values().stream().distinct().collect(Collectors.toList());
+ Assertions.assertEquals(3, bucketSize.size());
+ }
+}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 999f5e7faf..c5eabc6303 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -128,14 +128,21 @@ public class SinkExecuteProcessor
sink.setJobContext(jobContext);
}
handleSaveMode(sink);
+ boolean sinkParallelism =
sinkConfig.hasPath(CommonOptions.PARALLELISM.key());
+ boolean envParallelism =
envConfig.hasPath(CommonOptions.PARALLELISM.key());
+ int parallelism =
+ sinkParallelism
+ ?
sinkConfig.getInt(CommonOptions.PARALLELISM.key())
+ : envParallelism
+ ?
envConfig.getInt(CommonOptions.PARALLELISM.key())
+ : 1;
DataStreamSink<Row> dataStreamSink =
stream.getDataStream()
- .sinkTo(new FlinkSink<>(sink,
stream.getCatalogTables().get(0)))
+ .sinkTo(
+ new FlinkSink<>(
+ sink,
stream.getCatalogTables().get(0), parallelism))
.name(String.format("%s-Sink",
sink.getPluginName()));
- if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
- int parallelism =
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
- dataStreamSink.setParallelism(parallelism);
- }
+ dataStreamSink.setParallelism(parallelism);
}
// the sink is the last stream
return null;
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index 6b3fbd9b47..f3a76adfad 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -85,7 +85,6 @@ public class FlinkExecution implements TaskExecution {
registerPlugin(envConfig);
JobContext jobContext = new JobContext();
jobContext.setJobMode(RuntimeEnvironment.getJobMode(config));
-
this.sourcePluginExecuteProcessor =
new SourceExecuteProcessor(
jarPaths, envConfig,
config.getConfigList(Constants.SOURCE), jobContext);
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 65e6d3b9f2..5e6bd9173c 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -129,15 +129,24 @@ public class SinkExecuteProcessor
sink.setJobContext(jobContext);
}
handleSaveMode(sink);
+ boolean sinkParallelism =
sinkConfig.hasPath(CommonOptions.PARALLELISM.key());
+ boolean envParallelism =
envConfig.hasPath(CommonOptions.PARALLELISM.key());
+ int parallelism =
+ sinkParallelism
+ ?
sinkConfig.getInt(CommonOptions.PARALLELISM.key())
+ : envParallelism
+ ?
envConfig.getInt(CommonOptions.PARALLELISM.key())
+ : 1;
DataStreamSink<Row> dataStreamSink =
stream.getDataStream()
.sinkTo(
SinkV1Adapter.wrap(
new FlinkSink<>(
- sink,
stream.getCatalogTables().get(0))))
+ sink,
+
stream.getCatalogTables().get(0),
+ parallelism)))
.name(String.format("%s-Sink",
sink.getPluginName()));
- if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
- int parallelism =
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
+ if (sinkParallelism || envParallelism) {
dataStreamSink.setParallelism(parallelism);
}
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 48f8cab8e1..d5529d4ba0 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -124,7 +124,8 @@ public class SinkExecuteProcessor
sparkRuntimeEnvironment.getSparkSession().sparkContext().applicationId();
CatalogTable[] catalogTables =
datasetTableInfo.getCatalogTables().toArray(new
CatalogTable[0]);
- SparkSinkInjector.inject(dataset.write(), sink, catalogTables,
applicationId)
+ SparkSinkInjector.inject(
+ dataset.write(), sink, catalogTables,
applicationId, parallelism)
.option("checkpointLocation", "/tmp")
.save();
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 0b54e2a115..2763e3f949 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -124,7 +124,8 @@ public class SinkExecuteProcessor
sparkRuntimeEnvironment.getStreamingContext().sparkContext().applicationId();
CatalogTable[] catalogTables =
datasetTableInfo.getCatalogTables().toArray(new
CatalogTable[0]);
- SparkSinkInjector.inject(dataset.write(), sink, catalogTables,
applicationId)
+ SparkSinkInjector.inject(
+ dataset.write(), sink, catalogTables,
applicationId, parallelism)
.option("checkpointLocation", "/tmp")
.mode(SaveMode.Append)
.save();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
index 3068dc42a8..8035d2e5c9 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
@@ -93,7 +93,7 @@ class XaGroupOpsImplIT {
@Test
void testRecoverAndRollback() throws Exception {
JobContext jobContext = new JobContext();
- SinkWriter.Context writerContext1 = new DefaultSinkWriterContext(1);
+ SinkWriter.Context writerContext1 = new DefaultSinkWriterContext(1, 1);
Xid xid1 = xidGenerator.generateXid(jobContext, writerContext1,
System.currentTimeMillis());
Xid xid2 =
xidGenerator.generateXid(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
new file mode 100644
index 0000000000..8e36e07cd1
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.paimon;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.common.utils.FileUtils;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogLoader;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
+import org.apache.seatunnel.core.starter.utils.CompressionUtils;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.apache.commons.compress.archivers.ArchiveException;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.crosspartition.IndexBootstrap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.TimestampType;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.awaitility.Awaitility.given;
+
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason =
+ "Spark and Flink engine can not auto create paimon table on
worker node in local file(e.g flink tm) by savemode feature which can lead
error")
+@Slf4j
+public class PaimonSinkDynamicBucketIT extends TestSuiteBase implements
TestResource {
+
+ private static String CATALOG_ROOT_DIR = "/tmp/";
+ private static final String NAMESPACE = "paimon";
+ private static final String NAMESPACE_TAR = "paimon.tar.gz";
+ private static final String CATALOG_DIR = CATALOG_ROOT_DIR + NAMESPACE +
"/";
+ private String CATALOG_ROOT_DIR_WIN = "C:/Users/";
+ private String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
+ private boolean isWindows;
+
+ private Map<String, Object> PAIMON_SINK_PROPERTIES;
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ this.isWindows =
+
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
+ CATALOG_ROOT_DIR_WIN = CATALOG_ROOT_DIR_WIN +
System.getProperty("user.name") + "/tmp/";
+ CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
+ Map<String, Object> map = new HashMap<>();
+ map.put("warehouse", "hdfs:///tmp/paimon");
+ map.put("database", "default");
+ map.put("table", "st_test5");
+ Map<String, Object> paimonHadoopConf = new HashMap<>();
+ paimonHadoopConf.put("fs.defaultFS", "hdfs://nameservice1");
+ paimonHadoopConf.put("dfs.nameservices", "nameservice1");
+ paimonHadoopConf.put("dfs.ha.namenodes.nameservice1", "nn1,nn2");
+ paimonHadoopConf.put("dfs.namenode.rpc-address.nameservice1.nn1",
"dp06:8020");
+ paimonHadoopConf.put("dfs.namenode.rpc-address.nameservice1.nn2",
"dp07:8020");
+ paimonHadoopConf.put(
+ "dfs.client.failover.proxy.provider.nameservice1",
+
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
+ paimonHadoopConf.put("dfs.client.use.datanode.hostname", "true");
+ map.put("paimon.hadoop.conf", paimonHadoopConf);
+ this.PAIMON_SINK_PROPERTIES = map;
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {}
+
+ @TestTemplate
+ public void testWriteAndReadPaimon(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult textWriteResult =
+
container.executeJob("/fake_to_dynamic_bucket_paimon_case1.conf");
+ Assertions.assertEquals(0, textWriteResult.getExitCode());
+ Container.ExecResult readResult =
container.executeJob("/paimon_to_assert.conf");
+ Assertions.assertEquals(0, readResult.getExitCode());
+ Container.ExecResult readProjectionResult =
+ container.executeJob("/paimon_projection_to_assert.conf");
+ Assertions.assertEquals(0, readProjectionResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testBucketCount(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult textWriteResult =
+
container.executeJob("/fake_to_dynamic_bucket_paimon_case2.conf");
+ Assertions.assertEquals(0, textWriteResult.getExitCode());
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(30L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ // copy paimon to local
+
container.executeExtraCommands(containerExtendedFactory);
+ FileStoreTable table =
+ (FileStoreTable) getTable("default",
"st_test_2");
+ IndexBootstrap indexBootstrap = new
IndexBootstrap(table);
+ List<String> fieldNames =
+
IndexBootstrap.bootstrapType(table.schema()).getFieldNames();
+ int bucketIndexOf = fieldNames.indexOf("_BUCKET");
+ Set<Integer> bucketList = new HashSet<>();
+ try (RecordReader<InternalRow> recordReader =
+ indexBootstrap.bootstrap(1, 0)) {
+ recordReader.forEachRemaining(
+ row ->
bucketList.add(row.getInt(bucketIndexOf)));
+ }
+ Assertions.assertEquals(2, bucketList.size());
+ });
+ }
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SEATUNNEL})
+ @Disabled(
+ "Spark and Flink engine can not auto create paimon table on worker
node in local file, this e2e case work on hdfs environment, please set up your
own HDFS environment in the test case file and the below setup")
+ public void testPaimonBucketCountOnSparkAndFlink(TestContainer container)
+ throws IOException, InterruptedException,
Catalog.TableNotExistException {
+ PaimonSinkConfig paimonSinkConfig =
+ new
PaimonSinkConfig(ReadonlyConfig.fromMap(PAIMON_SINK_PROPERTIES));
+ PaimonCatalogLoader paimonCatalogLoader = new
PaimonCatalogLoader(paimonSinkConfig);
+ Catalog catalog = paimonCatalogLoader.loadCatalog();
+ Identifier identifier = Identifier.create("default", "st_test_5");
+ if (catalog.tableExists(identifier)) {
+ catalog.dropTable(identifier, true);
+ }
+ Container.ExecResult textWriteResult =
+
container.executeJob("/fake_to_dynamic_bucket_paimon_case5.conf");
+ Assertions.assertEquals(0, textWriteResult.getExitCode());
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(30L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ FileStoreTable table = (FileStoreTable)
catalog.getTable(identifier);
+ IndexBootstrap indexBootstrap = new
IndexBootstrap(table);
+ List<String> fieldNames =
+
IndexBootstrap.bootstrapType(table.schema()).getFieldNames();
+ int bucketIndexOf = fieldNames.indexOf("_BUCKET");
+ Set<Integer> bucketList = new HashSet<>();
+ try (RecordReader<InternalRow> recordReader =
+ indexBootstrap.bootstrap(1, 0)) {
+ recordReader.forEachRemaining(
+ row ->
bucketList.add(row.getInt(bucketIndexOf)));
+ }
+ Assertions.assertEquals(4, bucketList.size());
+ });
+ }
+
+ @TestTemplate
+ public void testParallelismBucketCount(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult textWriteResult =
+
container.executeJob("/fake_to_dynamic_bucket_paimon_case3.conf");
+ Assertions.assertEquals(0, textWriteResult.getExitCode());
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(30L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ // copy paimon to local
+
container.executeExtraCommands(containerExtendedFactory);
+ FileStoreTable table =
+ (FileStoreTable) getTable("default",
"st_test_3");
+ IndexBootstrap indexBootstrap = new
IndexBootstrap(table);
+ RowPartitionKeyExtractor keyExtractor =
+ new
RowPartitionKeyExtractor(table.schema());
+ SimpleBucketIndex simpleBucketIndex =
+ new SimpleBucketIndex(1, 0, 50000);
+ try (RecordReader<InternalRow> recordReader =
+ indexBootstrap.bootstrap(1, 0)) {
+ recordReader.forEachRemaining(
+ row ->
+ simpleBucketIndex.assign(
+ keyExtractor
+
.trimmedPrimaryKey(row)
+ .hashCode()));
+ }
+ Assertions.assertEquals(
+ 6,
simpleBucketIndex.getBucketInformation().size());
+ Assertions.assertEquals(
+ 50000,
simpleBucketIndex.getBucketInformation().get(0));
+ });
+ }
+
+ @TestTemplate
+ public void testCDCParallelismBucketCount(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult textWriteResult =
+
container.executeJob("/fake_to_dynamic_bucket_paimon_case4.conf");
+ Assertions.assertEquals(0, textWriteResult.getExitCode());
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(120L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ // copy paimon to local
+
container.executeExtraCommands(containerExtendedFactory);
+ FileStoreTable table =
+ (FileStoreTable) getTable("default",
"st_test_4");
+ IndexBootstrap indexBootstrap = new
IndexBootstrap(table);
+ List<String> fieldNames =
+
IndexBootstrap.bootstrapType(table.schema()).getFieldNames();
+ int bucketIndexOf = fieldNames.indexOf("_BUCKET");
+ Map<String, Integer> hashBucketMap = new
HashMap<>();
+ try (RecordReader<InternalRow> recordReader =
+ indexBootstrap.bootstrap(1, 0)) {
+ recordReader.forEachRemaining(
+ row -> {
+ int bucket =
row.getInt(bucketIndexOf);
+ int pkHash = row.getInt(0);
+ hashBucketMap.put(bucket + "_" +
pkHash, bucket);
+ });
+ }
+ HashMap<Integer, Long> bucketCountMap =
+ hashBucketMap.entrySet().stream()
+ .collect(
+ Collectors.groupingBy(
+
Map.Entry::getValue,
+ HashMap::new,
+
Collectors.counting()));
+ Assertions.assertEquals(4, bucketCountMap.size());
+ Assertions.assertEquals(5, bucketCountMap.get(0));
+ });
+ }
+
+ @TestTemplate
+ public void testCDCWrite(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult textWriteResult =
+
container.executeJob("/fake_cdc_to_dynamic_bucket_paimon_case.conf");
+ Assertions.assertEquals(0, textWriteResult.getExitCode());
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(30L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ // copy paimon to local
+
container.executeExtraCommands(containerExtendedFactory);
+ FileStoreTable table =
+ (FileStoreTable) getTable("default",
"st_test_3");
+ List<DataField> fields = table.schema().fields();
+ for (DataField field : fields) {
+ if (field.name().equalsIgnoreCase("one_time"))
{
+ Assertions.assertEquals(
+ 0, ((TimestampType)
field.type()).getPrecision());
+ }
+ if (field.name().equalsIgnoreCase("two_time"))
{
+ Assertions.assertEquals(
+ 3, ((TimestampType)
field.type()).getPrecision());
+ }
+ if
(field.name().equalsIgnoreCase("three_time")) {
+ Assertions.assertEquals(
+ 6, ((TimestampType)
field.type()).getPrecision());
+ }
+ if
(field.name().equalsIgnoreCase("four_time")) {
+ Assertions.assertEquals(
+ 9, ((TimestampType)
field.type()).getPrecision());
+ }
+ }
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ TableRead tableRead = readBuilder.newRead();
+ List<PaimonRecord> result = new ArrayList<>();
+ try (RecordReader<InternalRow> reader =
tableRead.createReader(plan)) {
+ reader.forEachRemaining(
+ row ->
+ result.add(
+ new PaimonRecord(
+ row.getLong(0),
+
row.getString(1).toString(),
+
row.getTimestamp(2, 0),
+
row.getTimestamp(3, 3),
+
row.getTimestamp(4, 6),
+
row.getTimestamp(5, 9))));
+ }
+ Assertions.assertEquals(2, result.size());
+ for (PaimonRecord paimonRecord : result) {
+ Assertions.assertEquals(
+ paimonRecord.oneTime.toString(),
"2024-03-10T10:00:12");
+ Assertions.assertEquals(
+ paimonRecord.twoTime.toString(),
"2024-03-10T10:00:00.123");
+ Assertions.assertEquals(
+ paimonRecord.threeTime.toString(),
+ "2024-03-10T10:00:00.123456");
+ Assertions.assertEquals(
+ paimonRecord.fourTime.toString(),
+ "2024-03-10T10:00:00.123456789");
+ }
+ });
+ }
+
+ protected final ContainerExtendedFactory containerExtendedFactory =
+ container -> {
+ if (isWindows) {
+ FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR);
+ FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + "paimon.tar");
+ FileUtils.createNewDir(CATALOG_ROOT_DIR_WIN);
+ } else {
+ FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR);
+ FileUtils.createNewDir(CATALOG_DIR);
+ }
+
+ container.execInContainer(
+ "sh",
+ "-c",
+ "cd "
+ + CATALOG_ROOT_DIR
+ + " && tar -czvf "
+ + NAMESPACE_TAR
+ + " "
+ + NAMESPACE);
+ container.copyFileFromContainer(
+ CATALOG_ROOT_DIR + NAMESPACE_TAR,
+ (isWindows ? CATALOG_ROOT_DIR_WIN : CATALOG_ROOT_DIR)
+ NAMESPACE_TAR);
+ if (isWindows) {
+ extractFilesWin();
+ } else {
+ extractFiles();
+ }
+ };
+
+ private void extractFiles() {
+ ProcessBuilder processBuilder = new ProcessBuilder();
+ processBuilder.command(
+ "sh", "-c", "cd " + CATALOG_ROOT_DIR + " && tar -zxvf " +
NAMESPACE_TAR);
+ try {
+ Process process = processBuilder.start();
+ // wait command completed
+ int exitCode = process.waitFor();
+ if (exitCode == 0) {
+ log.info("Extract files successful.");
+ } else {
+ log.error("Extract files failed with exit code " + exitCode);
+ }
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void extractFilesWin() {
+ try {
+ CompressionUtils.unGzip(
+ new File(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR), new
File(CATALOG_ROOT_DIR_WIN));
+ CompressionUtils.unTar(
+ new File(CATALOG_ROOT_DIR_WIN + "paimon.tar"), new
File(CATALOG_ROOT_DIR_WIN));
+ } catch (IOException | ArchiveException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected Table getTable(String dbName, String tbName) {
+ Options options = new Options();
+ if (isWindows) {
+ options.set("warehouse", CATALOG_DIR_WIN);
+ } else {
+ options.set("warehouse", "file://" + CATALOG_DIR);
+ }
+ try {
+ Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(options));
+ return catalog.getTable(Identifier.create(dbName, tbName));
+ } catch (Catalog.TableNotExistException e) {
+ // do something
+ throw new RuntimeException("table not exist");
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/SimpleBucketIndex.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/SimpleBucketIndex.java
new file mode 100644
index 0000000000..73965947df
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/SimpleBucketIndex.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.e2e.connector.paimon;
+
+import org.apache.paimon.utils.Int2ShortHashMap;
+
+import lombok.Getter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SimpleBucketIndex {
+ @Getter private final Int2ShortHashMap hash2Bucket;
+ @Getter private final Map<Integer, Long> bucketInformation;
+ private int currentBucket;
+ private int numAssigners;
+ private int assignId;
+ private int targetBucketRowNumber;
+
+ public SimpleBucketIndex(int numAssigners, int assignId, int
targetBucketRowNumber) {
+ this.numAssigners = numAssigners;
+ this.assignId = assignId;
+ this.targetBucketRowNumber = targetBucketRowNumber;
+ this.hash2Bucket = new Int2ShortHashMap();
+ this.bucketInformation = new HashMap();
+ this.loadNewBucket();
+ }
+
+ public int assign(int hash) {
+ if (this.hash2Bucket.containsKey(hash)) {
+ return this.hash2Bucket.get(hash);
+ } else {
+ Long num =
+ (Long)
+ this.bucketInformation.computeIfAbsent(
+ this.currentBucket,
+ (i) -> {
+ return 0L;
+ });
+ if (num >= this.targetBucketRowNumber) {
+ this.loadNewBucket();
+ }
+
+ this.bucketInformation.compute(
+ this.currentBucket,
+ (i, l) -> {
+ return l == null ? 1L : l + 1L;
+ });
+ this.hash2Bucket.put(hash, (short) this.currentBucket);
+ return this.currentBucket;
+ }
+ }
+
+ private void loadNewBucket() {
+ for (int i = 0; i < 32767; ++i) {
+ if (i % this.numAssigners == this.assignId &&
!this.bucketInformation.containsKey(i)) {
+ this.currentBucket = i;
+ return;
+ }
+ }
+
+ throw new RuntimeException(
+ "Can't find a suitable bucket to assign, all the bucket are
assigned?");
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_to_dynamic_bucket_paimon_case.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_to_dynamic_bucket_paimon_case.conf
new file mode 100644
index 0000000000..f9993fe33f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_to_dynamic_bucket_paimon_case.conf
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ columns = [
+ {
+ name = pk_id
+ type = bigint
+ nullable = false
+ comment = "primary key id"
+ },
+ {
+ name = name
+ type = "string"
+ nullable = true
+ comment = "name"
+ },
+ {
+ name = one_time
+ type = timestamp
+ nullable = false
+ comment = "one time"
+ columnScale = 0
+ },
+ {
+ name = two_time
+ type = timestamp
+ nullable = false
+ comment = "two time"
+ columnScale = 3
+ },
+ {
+ name = three_time
+ type = timestamp
+ nullable = false
+ comment = "three time"
+ columnScale = 6
+ },
+ {
+ name = four_time
+ type = timestamp
+ nullable = false
+ comment = "four time"
+ columnScale = 9
+ }
+ ]
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "A_1", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ },
+ {
+ kind = DELETE
+ fields = [2, "B", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123",
"2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"]
+ }
+ ]
+ }
+}
+
+transform {
+
+}
+
+sink {
+ Paimon {
+ warehouse = "file:///tmp/paimon"
+ database = "default"
+ table = "st_test_3"
+ paimon.table.write-props = {
+ bucket = -1
+ dynamic-bucket.target-row-num = 50000
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf
new file mode 100644
index 0000000000..e238f56044
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 100000
+ schema = {
+ fields {
+ pk_id = int
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "file:///tmp/paimon"
+ database = "default"
+ table = "st_test"
+ paimon.table.write-props = {
+ bucket = -1
+ dynamic-bucket.target-row-num = 50000
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
new file mode 100644
index 0000000000..338e624d04
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 100000
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "file:///tmp/paimon"
+ database = "default"
+ table = "st_test_2"
+ paimon.table.write-props = {
+ bucket = -1
+ dynamic-bucket.target-row-num = 50000
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
new file mode 100644
index 0000000000..21cbc0ef83
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 3
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 100000
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "file:///tmp/paimon"
+ database = "default"
+ table = "st_test_3"
+ paimon.table.write-props = {
+ bucket = -1
+ dynamic-bucket.target-row-num = 50000
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
new file mode 100644
index 0000000000..9a71171330
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 2
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [4, "D", 100]
+ },
+ {
+ kind = INSERT
+ fields = [5, "E", 100]
+ },
+ {
+ kind = INSERT
+ fields = [6, "F", 100]
+ },
+ {
+ kind = INSERT
+ fields = [7, "G", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "file:///tmp/paimon"
+ database = "default"
+ table = "st_test_4"
+ paimon.table.write-props = {
+ bucket = -1
+ dynamic-bucket.target-row-num = 5
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
new file mode 100644
index 0000000000..b1e562c088
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 2
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 100000
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ }
+}
+
+sink {
+ Paimon {
+ schema_save_mode = "RECREATE_SCHEMA"
+ catalog_name="seatunnel_test"
+ warehouse="hdfs:///tmp/paimon"
+ database="default"
+ table="st_test_5"
+ paimon.table.write-props = {
+ bucket = -1
+ dynamic-bucket.target-row-num = 50000
+ }
+ paimon.hadoop.conf = {
+ fs.defaultFS = "hdfs://nameservice1"
+ dfs.nameservices = "nameservice1"
+ dfs.ha.namenodes.nameservice1 = "nn1,nn2"
+ dfs.namenode.rpc-address.nameservice1.nn1 = "dp06:8020"
+ dfs.namenode.rpc-address.nameservice1.nn2 = "dp07:8020"
+ dfs.client.failover.proxy.provider.nameservice1 =
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+ dfs.client.use.datanode.hostname = "true"
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
index 747198d3eb..77329290f5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java
@@ -21,23 +21,38 @@ import
org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.event.EventListener;
import org.apache.seatunnel.api.sink.SinkWriter;
+import com.google.common.base.Preconditions;
+
public class SinkWriterContext implements SinkWriter.Context {
private static final long serialVersionUID = -3082515319043725121L;
- private final int indexID;
+ private final int indexOfSubtask;
+ private final int numberOfParallelSubtasks;
private final MetricsContext metricsContext;
private final EventListener eventListener;
public SinkWriterContext(
- int indexID, MetricsContext metricsContext, EventListener
eventListener) {
- this.indexID = indexID;
+ int numberOfParallelSubtasks,
+ int indexOfSubtask,
+ MetricsContext metricsContext,
+ EventListener eventListener) {
+ Preconditions.checkArgument(
+ numberOfParallelSubtasks >= 1, "Parallelism must be a positive
number.");
+ Preconditions.checkArgument(
+ indexOfSubtask >= 0, "Task index must be a non-negative
number.");
+ this.numberOfParallelSubtasks = numberOfParallelSubtasks;
+ this.indexOfSubtask = indexOfSubtask;
this.metricsContext = metricsContext;
this.eventListener = eventListener;
}
@Override
public int getIndexOfSubtask() {
- return indexID;
+ return indexOfSubtask;
+ }
+
+ public int getNumberOfParallelSubtasks() {
+ return numberOfParallelSubtasks;
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 3234560fe4..bce6e9f637 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -286,7 +286,9 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
.deserialize(bytes)))
.collect(Collectors.toList());
}
- this.writerContext = new SinkWriterContext(indexID, metricsContext,
eventListener);
+ this.writerContext =
+ new SinkWriterContext(
+ sinkAction.getParallelism(), indexID, metricsContext,
eventListener);
if (states.isEmpty()) {
this.writer = sinkAction.getSink().createWriter(writerContext);
} else {
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java
index 4a81b43c9a..2d34d198f8 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java
@@ -39,10 +39,12 @@ public class FlinkSinkWriterContext implements
SinkWriter.Context {
private final InitContext writerContext;
private final EventListener eventListener;
+ private final int parallelism;
- public FlinkSinkWriterContext(InitContext writerContext) {
+ public FlinkSinkWriterContext(InitContext writerContext, int parallelism) {
this.writerContext = writerContext;
this.eventListener = new
DefaultEventProcessor(getJobIdForV14(writerContext));
+ this.parallelism = parallelism;
}
@Override
@@ -50,6 +52,11 @@ public class FlinkSinkWriterContext implements
SinkWriter.Context {
return writerContext.getSubtaskId();
}
+ @Override
+ public int getNumberOfParallelSubtasks() {
+ return parallelism;
+ }
+
@Override
public MetricsContext getMetricsContext() {
try {
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index 2ebbcba4f9..cab4b6f225 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -50,11 +50,15 @@ public class FlinkSink<InputT, CommT, WriterStateT,
GlobalCommT>
private final CatalogTable catalogTable;
+ private final int parallelism;
+
public FlinkSink(
SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink,
- CatalogTable catalogTable) {
+ CatalogTable catalogTable,
+ int parallelism) {
this.sink = sink;
this.catalogTable = catalogTable;
+ this.parallelism = parallelism;
}
@Override
@@ -62,8 +66,7 @@ public class FlinkSink<InputT, CommT, WriterStateT,
GlobalCommT>
Sink.InitContext context, List<FlinkWriterState<WriterStateT>>
states)
throws IOException {
org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
- new FlinkSinkWriterContext(context);
-
+ new FlinkSinkWriterContext(context, parallelism);
if (states == null || states.isEmpty()) {
return new FlinkSinkWriter<>(
sink.createWriter(stContext), 1,
catalogTable.getSeaTunnelRowType(), stContext);
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java
index 7969f44509..747511a821 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java
@@ -36,10 +36,12 @@ public class FlinkSinkWriterContext implements
SinkWriter.Context {
private final Sink.InitContext writerContext;
private final EventListener eventListener;
+ private final int parallelism;
- public FlinkSinkWriterContext(InitContext writerContext) {
+ public FlinkSinkWriterContext(InitContext writerContext, int parallelism) {
this.writerContext = writerContext;
this.eventListener = new
DefaultEventProcessor(getFlinkJobId(writerContext));
+ this.parallelism = parallelism;
}
@Override
@@ -47,6 +49,11 @@ public class FlinkSinkWriterContext implements
SinkWriter.Context {
return writerContext.getSubtaskId();
}
+ @Override
+ public int getNumberOfParallelSubtasks() {
+ return writerContext.getNumberOfParallelSubtasks();
+ }
+
@Override
public MetricsContext getMetricsContext() {
return new
FlinkMetricContext(getStreamingRuntimeContextForV15(writerContext));
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
index 0315f275ff..101b783590 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
@@ -47,6 +47,8 @@ public class SparkSink<StateT, CommitInfoT,
AggregatedCommitInfoT>
private volatile String jobId;
+ private volatile Integer parallelism;
+
private void init(DataSourceOptions options) {
if (sink == null) {
this.sink =
@@ -71,6 +73,16 @@ public class SparkSink<StateT, CommitInfoT,
AggregatedCommitInfoT>
if (jobId == null) {
this.jobId = options.get(SparkSinkInjector.JOB_ID).orElse(null);
}
+ if (parallelism == null) {
+ this.parallelism =
+ options.get(SparkSinkInjector.PARALLELISM)
+ .map(Integer::parseInt)
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+
SparkSinkInjector.PARALLELISM
+ + " must be
specified"));
+ }
}
@Override
@@ -79,7 +91,7 @@ public class SparkSink<StateT, CommitInfoT,
AggregatedCommitInfoT>
init(options);
try {
- return new SparkStreamWriter<>(sink, catalogTables, jobId);
+ return new SparkStreamWriter<>(sink, catalogTables, jobId,
parallelism);
} catch (IOException e) {
throw new RuntimeException("find error when createStreamWriter",
e);
}
@@ -91,7 +103,8 @@ public class SparkSink<StateT, CommitInfoT,
AggregatedCommitInfoT>
init(options);
try {
- return Optional.of(new SparkDataSourceWriter<>(sink,
catalogTables, jobId));
+ return Optional.of(
+ new SparkDataSourceWriter<>(sink, catalogTables, jobId,
parallelism));
} catch (IOException e) {
throw new RuntimeException("find error when createStreamWriter",
e);
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
index 740188339c..ff889f21d4 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
@@ -34,27 +34,32 @@ public class SparkSinkInjector {
public static final String SINK_CATALOG_TABLE = "sink.catalog.table";
public static final String JOB_ID = "jobId";
+ public static final String PARALLELISM = "parallelism";
public static DataStreamWriter<Row> inject(
DataStreamWriter<Row> dataset,
SeaTunnelSink<?, ?, ?, ?> sink,
CatalogTable[] catalogTables,
- String applicationId) {
+ String applicationId,
+ int parallelism) {
return dataset.format(SPARK_SINK_CLASS_NAME)
.outputMode(OutputMode.Append())
.option(Constants.SINK_SERIALIZATION,
SerializationUtils.objectToString(sink))
.option(SINK_CATALOG_TABLE,
SerializationUtils.objectToString(catalogTables))
- .option(JOB_ID, applicationId);
+ .option(JOB_ID, applicationId)
+ .option(PARALLELISM, parallelism);
}
public static DataFrameWriter<Row> inject(
DataFrameWriter<Row> dataset,
SeaTunnelSink<?, ?, ?, ?> sink,
CatalogTable[] catalogTables,
- String applicationId) {
+ String applicationId,
+ int parallelism) {
return dataset.format(SPARK_SINK_CLASS_NAME)
.option(Constants.SINK_SERIALIZATION,
SerializationUtils.objectToString(sink))
.option(SINK_CATALOG_TABLE,
SerializationUtils.objectToString(catalogTables))
- .option(JOB_ID, applicationId);
+ .option(JOB_ID, applicationId)
+ .option(PARALLELISM, parallelism);
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
index 29755c6db4..04765de9b1 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
@@ -49,17 +49,20 @@ public class SparkDataSourceWriter<StateT, CommitInfoT,
AggregatedCommitInfoT>
protected final CatalogTable[] catalogTables;
protected final String jobId;
+ protected final int parallelism;
private MultiTableResourceManager resourceManager;
public SparkDataSourceWriter(
SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink,
CatalogTable[] catalogTables,
- String jobId)
+ String jobId,
+ int parallelism)
throws IOException {
this.sink = sink;
this.catalogTables = catalogTables;
this.jobId = jobId;
+ this.parallelism = parallelism;
this.sinkAggregatedCommitter =
sink.createAggregatedCommitter().orElse(null);
if (sinkAggregatedCommitter != null) {
// TODO close it
@@ -78,7 +81,7 @@ public class SparkDataSourceWriter<StateT, CommitInfoT,
AggregatedCommitInfoT>
@Override
public DataWriterFactory<InternalRow> createWriterFactory() {
- return new SparkDataWriterFactory<>(sink, catalogTables, jobId);
+ return new SparkDataWriterFactory<>(sink, catalogTables, jobId,
parallelism);
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
index b684654103..f8ed25c044 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
@@ -36,20 +36,23 @@ public class SparkDataWriterFactory<CommitInfoT, StateT>
implements DataWriterFa
private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink;
private final CatalogTable[] catalogTables;
private final String jobId;
+ private final int parallelism;
SparkDataWriterFactory(
SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink,
CatalogTable[] catalogTables,
- String jobId) {
+ String jobId,
+ int parallelism) {
this.sink = sink;
this.catalogTables = catalogTables;
this.jobId = jobId;
+ this.parallelism = parallelism;
}
@Override
public DataWriter<InternalRow> createDataWriter(int partitionId, long
taskId, long epochId) {
org.apache.seatunnel.api.sink.SinkWriter.Context context =
- new DefaultSinkWriterContext(jobId, (int) taskId);
+ new DefaultSinkWriterContext(jobId, (int) taskId, parallelism);
SinkWriter<SeaTunnelRow, CommitInfoT, StateT> writer;
SinkCommitter<CommitInfoT> committer;
try {
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkStreamWriter.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkStreamWriter.java
index 1c8bbb4b87..32a56a97c9 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkStreamWriter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkStreamWriter.java
@@ -35,9 +35,10 @@ public class SparkStreamWriter<StateT, CommitInfoT,
AggregatedCommitInfoT>
public SparkStreamWriter(
SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink,
CatalogTable[] catalogTables,
- String jobId)
+ String jobId,
+ int parallelism)
throws IOException {
- super(sink, catalogTables, jobId);
+ super(sink, catalogTables, jobId, parallelism);
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
index f1c343ba56..59f4c9d0cb 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
@@ -53,14 +53,18 @@ public class SeaTunnelBatchWrite<StateT, CommitInfoT,
AggregatedCommitInfoT>
private final String jobId;
+ private final int parallelism;
+
public SeaTunnelBatchWrite(
SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink,
CatalogTable[] catalogTables,
- String jobId)
+ String jobId,
+ int parallelism)
throws IOException {
this.sink = sink;
this.catalogTables = catalogTables;
this.jobId = jobId;
+ this.parallelism = parallelism;
this.aggregatedCommitter =
sink.createAggregatedCommitter().orElse(null);
if (aggregatedCommitter != null) {
if (this.aggregatedCommitter instanceof SupportResourceShare) {
@@ -78,7 +82,7 @@ public class SeaTunnelBatchWrite<StateT, CommitInfoT,
AggregatedCommitInfoT>
@Override
public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
- return new SeaTunnelSparkDataWriterFactory<>(sink, catalogTables,
jobId);
+ return new SeaTunnelSparkDataWriterFactory<>(sink, catalogTables,
jobId, parallelism);
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkTable.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkTable.java
index f3652be673..b35df0d4ec 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkTable.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkTable.java
@@ -36,6 +36,7 @@ import org.apache.spark.sql.types.StructType;
import com.google.common.collect.Sets;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
public class SeaTunnelSinkTable implements Table, SupportsWrite {
@@ -48,6 +49,7 @@ public class SeaTunnelSinkTable implements Table,
SupportsWrite {
private final CatalogTable[] catalogTables;
private final String jobId;
+ private final int parallelism;
public SeaTunnelSinkTable(Map<String, String> properties) {
this.properties = properties;
@@ -64,11 +66,19 @@ public class SeaTunnelSinkTable implements Table,
SupportsWrite {
}
this.catalogTables =
SerializationUtils.stringToObject(sinkCatalogTableSerialization);
this.jobId = properties.getOrDefault(SparkSinkInjector.JOB_ID, null);
+ this.parallelism =
+
Optional.of(properties.getOrDefault(SparkSinkInjector.PARALLELISM, null))
+ .map(Integer::parseInt)
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ SparkSinkInjector.PARALLELISM
+ + " must be
specified"));
}
@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
- return new SeaTunnelWriteBuilder<>(sink, catalogTables, jobId);
+ return new SeaTunnelWriteBuilder<>(sink, catalogTables, jobId,
parallelism);
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
index b975c0913b..531d0d9511 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
@@ -35,28 +35,34 @@ public class SparkSinkInjector {
public static final String JOB_ID = "jobId";
+ public static final String PARALLELISM = "parallelism";
+
public static DataStreamWriter<Row> inject(
DataStreamWriter<Row> dataset,
SeaTunnelSink<?, ?, ?, ?> sink,
CatalogTable[] catalogTables,
- String applicationId) {
+ String applicationId,
+ int parallelism) {
return dataset.format(SINK_NAME)
.outputMode(OutputMode.Append())
.option(Constants.SINK_SERIALIZATION,
SerializationUtils.objectToString(sink))
// TODO this should require fetching the catalog table in sink
.option(SINK_CATALOG_TABLE,
SerializationUtils.objectToString(catalogTables))
- .option(JOB_ID, applicationId);
+ .option(JOB_ID, applicationId)
+ .option(PARALLELISM, parallelism);
}
public static DataFrameWriter<Row> inject(
DataFrameWriter<Row> dataset,
SeaTunnelSink<?, ?, ?, ?> sink,
CatalogTable[] catalogTables,
- String applicationId) {
+ String applicationId,
+ int parallelism) {
return dataset.format(SINK_NAME)
.option(Constants.SINK_SERIALIZATION,
SerializationUtils.objectToString(sink))
// TODO this should require fetching the catalog table in sink
.option(SINK_CATALOG_TABLE,
SerializationUtils.objectToString(catalogTables))
- .option(JOB_ID, applicationId);
+ .option(JOB_ID, applicationId)
+ .option(PARALLELISM, parallelism);
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
index 255a9cd339..7d7537222e 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
@@ -38,19 +38,22 @@ public class SeaTunnelSparkDataWriterFactory<CommitInfoT,
StateT>
private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink;
private final CatalogTable[] catalogTables;
private final String jobId;
+ private final int parallelism;
public SeaTunnelSparkDataWriterFactory(
SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink,
CatalogTable[] catalogTables,
- String jobId) {
+ String jobId,
+ int parallelism) {
this.sink = sink;
this.catalogTables = catalogTables;
this.jobId = jobId;
+ this.parallelism = parallelism;
}
@Override
public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
- SinkWriter.Context context = new DefaultSinkWriterContext(jobId, (int)
taskId);
+ SinkWriter.Context context = new DefaultSinkWriterContext(jobId, (int)
taskId, parallelism);
SinkWriter<SeaTunnelRow, CommitInfoT, StateT> writer;
SinkCommitter<CommitInfoT> committer;
try {
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWrite.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWrite.java
index 2bd4388cc2..6d0211fc8b 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWrite.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWrite.java
@@ -33,20 +33,23 @@ public class SeaTunnelWrite<AggregatedCommitInfoT,
CommitInfoT, StateT> implemen
private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink;
private final CatalogTable[] catalogTables;
private final String jobId;
+ private final int parallelism;
public SeaTunnelWrite(
SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink,
CatalogTable[] catalogTables,
- String jobId) {
+ String jobId,
+ int parallelism) {
this.sink = sink;
this.catalogTables = catalogTables;
this.jobId = jobId;
+ this.parallelism = parallelism;
}
@Override
public BatchWrite toBatch() {
try {
- return new SeaTunnelBatchWrite<>(sink, catalogTables, jobId);
+ return new SeaTunnelBatchWrite<>(sink, catalogTables, jobId,
parallelism);
} catch (IOException e) {
throw new RuntimeException("SeaTunnel Spark sink create batch
failed", e);
}
@@ -55,7 +58,7 @@ public class SeaTunnelWrite<AggregatedCommitInfoT,
CommitInfoT, StateT> implemen
@Override
public StreamingWrite toStreaming() {
try {
- return new SeaTunnelBatchWrite<>(sink, catalogTables, jobId);
+ return new SeaTunnelBatchWrite<>(sink, catalogTables, jobId,
parallelism);
} catch (IOException e) {
throw new RuntimeException("SeaTunnel Spark sink create batch
failed", e);
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWriteBuilder.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWriteBuilder.java
index 896982c351..af0751e219 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWriteBuilder.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWriteBuilder.java
@@ -30,18 +30,21 @@ public class SeaTunnelWriteBuilder<StateT, CommitInfoT,
AggregatedCommitInfoT>
private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink;
private final CatalogTable[] catalogTables;
private final String jobId;
+ private final int parallelism;
public SeaTunnelWriteBuilder(
SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink,
CatalogTable[] catalogTables,
- String jobId) {
+ String jobId,
+ int parallelism) {
this.sink = sink;
this.catalogTables = catalogTables;
this.jobId = jobId;
+ this.parallelism = parallelism;
}
@Override
public Write build() {
- return new SeaTunnelWrite<>(sink, catalogTables, jobId);
+ return new SeaTunnelWrite<>(sink, catalogTables, jobId, parallelism);
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SparkSinkTest.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SparkSinkTest.java
index 73084fb76b..2e0d0f3f0d 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SparkSinkTest.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SparkSinkTest.java
@@ -417,7 +417,8 @@ public class SparkSinkTest {
CatalogTableUtil.getCatalogTable(
"test", "test", "test", "test", rowType)
},
- spark.sparkContext().applicationId())
+ spark.sparkContext().applicationId(),
+ spark.sparkContext().defaultParallelism())
.option("checkpointLocation", "/tmp")
.mode(SaveMode.Append)
.save();