This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 846d876dc2 [Improve][Connector-v2] Remove useless code and add
changelog doc for paimon sink (#7748)
846d876dc2 is described below
commit 846d876dc2b0dea4bd68938e90244ccad4177d03
Author: dailai <[email protected]>
AuthorDate: Thu Sep 26 14:28:21 2024 +0800
[Improve][Connector-v2] Remove useless code and add changelog doc for
paimon sink (#7748)
---
docs/en/connector-v2/sink/Doris.md | 1 -
docs/en/connector-v2/sink/Paimon.md | 9 +++++++++
docs/zh/connector-v2/sink/Doris.md | 8 +++++---
docs/zh/connector-v2/sink/Paimon.md | 9 +++++++++
.../connectors/seatunnel/paimon/sink/PaimonSinkWriter.java | 4 ----
.../seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java | 3 ---
.../connectors/seatunnel/paimon/source/PaimonSource.java | 3 ---
.../paimon/source/enumerator/AbstractSplitEnumerator.java | 3 ---
8 files changed, 23 insertions(+), 17 deletions(-)
diff --git a/docs/en/connector-v2/sink/Doris.md
b/docs/en/connector-v2/sink/Doris.md
index 2518ce7ad0..ae9bcdfa4e 100644
--- a/docs/en/connector-v2/sink/Doris.md
+++ b/docs/en/connector-v2/sink/Doris.md
@@ -151,7 +151,6 @@ You can use the following placeholders
The supported formats include CSV and JSON
## Tuning Guide
-
Appropriately increasing the value of `sink.buffer-size` and
`doris.batch.size` can increase the write performance.
In stream mode, if the `doris.batch.size` and `checkpoint.interval` are both
configured with a large value, The last data to arrive may have a large
delay(The delay time is the checkpoint interval).
diff --git a/docs/en/connector-v2/sink/Paimon.md
b/docs/en/connector-v2/sink/Paimon.md
index bb4bb9d5d7..8133b6e836 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -47,6 +47,15 @@ libfb303-xxx.jar
| paimon.hadoop.conf | Map | No | -
| Properties in hadoop conf
|
| paimon.hadoop.conf-path | String | No | -
| The specified loading path for the 'core-site.xml', 'hdfs-site.xml',
'hive-site.xml' files
|
+## Changelog
+You must configure the `changelog-producer=input` option to enable the
changelog producer mode of the paimon table. If you use the auto-create table
function of paimon sink, you can configure this property in
`paimon.table.write-props`.
+
+The changelog producer mode of the paimon table has [four
mode](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/)
which is `none`、`input`、`lookup` and `full-compaction`.
+
+Currently, we only support the `none` and `input` mode. The default is `none`
which will not output the changelog file. The `input` mode will output the
changelog file in paimon table.
+
+When you use a streaming mode to read paimon table, these two mode will
produce [different
results](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog).
+
## Examples
### Single table
diff --git a/docs/zh/connector-v2/sink/Doris.md
b/docs/zh/connector-v2/sink/Doris.md
index 9262af987c..d2176237be 100644
--- a/docs/zh/connector-v2/sink/Doris.md
+++ b/docs/zh/connector-v2/sink/Doris.md
@@ -148,10 +148,12 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
支持的格式包括 CSV 和 JSON。
## 调优指南
+适当增加`sink.buffer-size`和`doris.batch.size`的值可以提高写性能。
+
+在流模式下,如果`doris.batch.size`和`checkpoint.interval`都配置为较大的值,最后到达的数据可能会有较大的延迟(延迟的时间就是检查点间隔的时间)。
+
+这是因为最后到达的数据总量可能不会超过doris.batch.size指定的阈值。因此,在接收到数据的数据量没有超过该阈值之前只有检查点才会触发提交操作。因此,需要选择一个合适的检查点间隔。
-适当增加`sink.buffer-size`和`doris.batch.size`的值可以提高写性能。<br>
-在流模式下,如果`doris.batch.size`和`checkpoint.interval`都配置为较大的值,最后到达的数据可能会有较大的延迟(延迟的时间就是检查点间隔的时间)。<br>
-这是因为最后到达的数据总量可能不会超过doris.batch.size指定的阈值。因此,在接收到数据的数据量没有超过该阈值之前只有检查点才会触发提交操作。因此,需要选择一个合适的检查点间隔。<br>
此外,如果你通过`sink.enable-2pc=true`属性启用2pc。`sink.buffer-size`将会失去作用,只有检查点才能触发提交。
## 任务示例
diff --git a/docs/zh/connector-v2/sink/Paimon.md
b/docs/zh/connector-v2/sink/Paimon.md
index 66f6738efc..32d35a5e95 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -46,6 +46,15 @@ libfb303-xxx.jar
| paimon.hadoop.conf | Map | 否 | -
| Hadoop配置文件属性信息
|
| paimon.hadoop.conf-path | 字符串 | 否 | - |
Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置
|
+## 更新日志
+你必须配置`changelog-producer=input`来启用paimon表的changelog产生模式。如果你使用了paimon
sink的自动建表功能,你可以在`paimon.table.write-props`中指定这个属性。
+
+Paimon表的changelog产生模式有[四种](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/),分别是`none`、`input`、`lookup`
和 `full-compaction`。
+
+目前,我们只支持`none`和`input`模式。默认是`none`,这种模式将不会产生changelog文件。`input`模式将会在Paimon表下产生changelog文件。
+
+当你使用流模式去读paimon表的数据时,这两种模式将会产生[不同的结果](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。
+
## 示例
### 单表
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 2b234bdd76..18442d05b5 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -69,8 +69,6 @@ public class PaimonSinkWriter
private final TableWrite tableWrite;
- private long checkpointId = 0;
-
private List<CommitMessage> committables = new ArrayList<>();
private final Table table;
@@ -128,7 +126,6 @@ public class PaimonSinkWriter
return;
}
this.commitUser = states.get(0).getCommitUser();
- this.checkpointId = states.get(0).getCheckpointId();
try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
List<CommitMessage> commitables =
states.stream()
@@ -193,7 +190,6 @@ public class PaimonSinkWriter
@Override
public List<PaimonSinkState> snapshotState(long checkpointId) throws
IOException {
- this.checkpointId = checkpointId;
PaimonSinkState paimonSinkState =
new PaimonSinkState(new ArrayList<>(committables), commitUser,
checkpointId);
committables.clear();
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
index 2135a328b0..5c3f68f336 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
@@ -26,7 +26,6 @@ import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnecto
import
org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
-import org.apache.paimon.operation.Lock;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitMessage;
@@ -51,8 +50,6 @@ public class PaimonAggregatedCommitter
private static final long serialVersionUID = 1L;
- private final Lock.Factory localFactory = Lock.emptyFactory();
-
private final WriteBuilder tableWriteBuilder;
private final JobContext jobContext;
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
index d5c31ff235..185520b30e 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
@@ -58,8 +58,6 @@ public class PaimonSource
public static final String PLUGIN_NAME = "Paimon";
- private ReadonlyConfig readonlyConfig;
-
private SeaTunnelRowType seaTunnelRowType;
private Table paimonTable;
@@ -71,7 +69,6 @@ public class PaimonSource
protected final ReadBuilder readBuilder;
public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog
paimonCatalog) {
- this.readonlyConfig = readonlyConfig;
PaimonSourceConfig paimonSourceConfig = new
PaimonSourceConfig(readonlyConfig);
TablePath tablePath =
TablePath.of(paimonSourceConfig.getNamespace(),
paimonSourceConfig.getTable());
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
index 278381a24a..7789b0ca88 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
@@ -67,8 +67,6 @@ public abstract class AbstractSplitEnumerator
@Nullable protected Long nextSnapshotId;
- protected boolean finished = false;
-
private ExecutorService executorService;
public AbstractSplitEnumerator(
@@ -212,7 +210,6 @@ public abstract class AbstractSplitEnumerator
if (error != null) {
if (error instanceof EndOfScanException) {
log.debug("Catching EndOfStreamException, the stream is
finished.");
- finished = true;
assignSplits();
} else {
log.error("Failed to enumerate files", error);