This is an automated email from the ASF dual-hosted git repository.
dailai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 45a7a715a2 [Improve][Connector-V2] Reduce the create times of iceberg
sink writer (#8155)
45a7a715a2 is described below
commit 45a7a715a2ae4e6577b049bd9fb21ccbb3aab68f
Author: Jia Fan <[email protected]>
AuthorDate: Thu Dec 5 09:34:54 2024 +0800
[Improve][Connector-V2] Reduce the create times of iceberg sink writer
(#8155)
---
.../seatunnel/iceberg/sink/IcebergSinkWriter.java | 14 +++++++++-----
.../iceberg/sink/commit/IcebergFilesCommitter.java | 8 ++++----
.../seatunnel/iceberg/sink/writer/IcebergRecordWriter.java | 7 ++++---
.../seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java | 5 +++++
4 files changed, 22 insertions(+), 12 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java
index 2175080ac7..1028ae21b4 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java
@@ -61,7 +61,7 @@ public class IcebergSinkWriter
private SeaTunnelRowType rowType;
private final SinkConfig config;
private final IcebergTableLoader icebergTableLoader;
- private RecordWriter writer;
+ private volatile RecordWriter writer;
private final IcebergFilesCommitter filesCommitter;
private final List<WriteResult> results = Lists.newArrayList();
private String commitUser = UUID.randomUUID().toString();
@@ -79,7 +79,6 @@ public class IcebergSinkWriter
this.rowType = tableSchema.toPhysicalRowDataType();
this.filesCommitter = IcebergFilesCommitter.of(config,
icebergTableLoader);
this.dataTypeChangeEventHandler = new DataTypeChangeEventDispatcher();
- tryCreateRecordWriter();
if (Objects.nonNull(states) && !states.isEmpty()) {
this.commitUser = states.get(0).getCommitUser();
preCommit(states);
@@ -107,8 +106,7 @@ public class IcebergSinkWriter
public static IcebergSinkWriter of(
SinkConfig config, CatalogTable catalogTable,
List<IcebergSinkState> states) {
- IcebergTableLoader icebergTableLoader =
- IcebergTableLoader.create(config, catalogTable).open();
+ IcebergTableLoader icebergTableLoader =
IcebergTableLoader.create(config, catalogTable);
return new IcebergSinkWriter(
icebergTableLoader, config, catalogTable.getTableSchema(),
states);
}
@@ -121,7 +119,12 @@ public class IcebergSinkWriter
@Override
public Optional<IcebergCommitInfo> prepareCommit() throws IOException {
- List<WriteResult> writeResults = writer.complete();
+ List<WriteResult> writeResults;
+ if (writer != null) {
+ writeResults = writer.complete();
+ } else {
+ writeResults = Collections.emptyList();
+ }
IcebergCommitInfo icebergCommitInfo = new
IcebergCommitInfo(writeResults);
this.results.addAll(writeResults);
return Optional.of(icebergCommitInfo);
@@ -134,6 +137,7 @@ public class IcebergSinkWriter
log.info("changed rowType before: {}", fieldsInfo(rowType));
this.rowType =
dataTypeChangeEventHandler.reset(rowType).apply(event);
log.info("changed rowType after: {}", fieldsInfo(rowType));
+ tryCreateRecordWriter();
writer.applySchemaChange(this.rowType, event);
}
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java
index 0b5e473440..5e44e1d875 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java
@@ -54,12 +54,10 @@ public class IcebergFilesCommitter implements Serializable {
public void doCommit(List<WriteResult> results) {
TableIdentifier tableIdentifier =
icebergTableLoader.getTableIdentifier();
- Table table = icebergTableLoader.loadTable();
- log.info("do commit table : " + table.toString());
- commit(tableIdentifier, table, results);
+ commit(tableIdentifier, results);
}
- private void commit(TableIdentifier tableIdentifier, Table table,
List<WriteResult> results) {
+ private void commit(TableIdentifier tableIdentifier, List<WriteResult>
results) {
List<DataFile> dataFiles =
results.stream()
.filter(payload -> payload.getDataFiles() != null)
@@ -77,6 +75,8 @@ public class IcebergFilesCommitter implements Serializable {
if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
log.info(String.format("Nothing to commit to table %s, skipping",
tableIdentifier));
} else {
+ Table table = icebergTableLoader.loadTable();
+ log.info("do commit table : {}", table.toString());
if (deleteFiles.isEmpty()) {
AppendFiles append = table.newAppend();
if (branch != null) {
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
index 9c8949ba1d..22d0480aa4 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
@@ -52,7 +52,7 @@ public class IcebergRecordWriter implements RecordWriter {
private final Table table;
private final SinkConfig config;
private final List<WriteResult> writerResults;
- private TaskWriter<Record> writer;
+ private volatile TaskWriter<Record> writer;
private RowConverter recordConverter;
private final IcebergWriterFactory writerFactory;
@@ -62,7 +62,6 @@ public class IcebergRecordWriter implements RecordWriter {
this.writerResults = Lists.newArrayList();
this.recordConverter = new RowConverter(table, config);
this.writerFactory = writerFactory;
- this.writer = createTaskWriter();
}
private TaskWriter<Record> createTaskWriter() {
@@ -71,6 +70,9 @@ public class IcebergRecordWriter implements RecordWriter {
@Override
public void write(SeaTunnelRow seaTunnelRow, SeaTunnelRowType rowType) {
+ if (writer == null) {
+ resetWriter();
+ }
SchemaChangeWrapper updates = new SchemaChangeWrapper();
Record record = recordConverter.convert(seaTunnelRow, rowType,
updates);
if (!updates.empty()) {
@@ -139,7 +141,6 @@ public class IcebergRecordWriter implements RecordWriter {
flush();
List<WriteResult> result = Lists.newArrayList(writerResults);
writerResults.clear();
- resetWriter();
return result;
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
index a7cbba8b89..fa271eb8f6 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
@@ -209,6 +209,11 @@ public class IcebergSinkCDCIT extends TestSuiteBase
implements TestResource {
}
@TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason =
+ "Currently SPARK do not support cdc. In addition,
currently only the zeta engine supports schema evolution for pr
https://github.com/apache/seatunnel/pull/5125.")
public void testMysqlCdcCheckSchemaChangeE2e(TestContainer container)
throws IOException, InterruptedException {
// Clear related content to ensure that multiple operations are not
affected