This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 934434cc75 [Feature][Connector-V2] Support write cdc changelog event
into hudi sink (#7845)
934434cc75 is described below
commit 934434cc75afc747339182df6468f89a5d0fb1b2
Author: happyboy1024 <[email protected]>
AuthorDate: Fri Oct 18 11:54:23 2024 +0800
[Feature][Connector-V2] Support write cdc changelog event into hudi sink
(#7845)
Co-authored-by: happyboy1024 <[email protected]>
---
docs/en/connector-v2/sink/Hudi.md | 8 +-
docs/zh/connector-v2/sink/Hudi.md | 8 +-
seatunnel-connectors-v2/connector-hudi/pom.xml | 23 ++
.../seatunnel/hudi/catalog/HudiCatalog.java | 11 +-
.../seatunnel/hudi/config/HudiOptions.java | 6 -
.../seatunnel/hudi/config/HudiSinkConfig.java | 4 -
.../seatunnel/hudi/config/HudiTableConfig.java | 5 +
.../seatunnel/hudi/config/HudiTableOptions.java | 9 +-
.../connectors/seatunnel/hudi/sink/HudiSink.java | 21 +-
.../seatunnel/hudi/sink/HudiSinkFactory.java | 8 +-
.../sink/commiter/HudiSinkAggregatedCommitter.java | 102 -----
.../hudi/sink/convert/AvroSchemaConverter.java | 23 +-
.../hudi/sink/convert/HudiRecordConverter.java | 8 +-
.../hudi/sink/convert/RowDataToAvroConverters.java | 8 +-
.../hudi/sink/state/HudiAggregatedCommitInfo.java | 11 +-
.../seatunnel/hudi/sink/state/HudiCommitInfo.java | 18 +-
.../hudi/sink/writer/HudiRecordWriter.java | 178 ++++----
.../seatunnel/hudi/sink/writer/HudiSinkWriter.java | 32 +-
.../connectors/seatunnel/hudi/util/HudiUtil.java | 9 +-
.../connectors/seatunnel/hudi/HudiTest.java | 30 +-
.../seatunnel/hudi/catalog/HudiCatalogTest.java | 1 +
.../connector-hudi-e2e/pom.xml | 32 ++
.../seatunnel/e2e/connector/hudi/HudiIT.java | 4 +-
.../e2e/connector/hudi/HudiMultiTableIT.java | 3 +-
.../hudi/HudiSeatunnelS3MultiTableIT.java | 4 +-
.../e2e/connector/hudi/HudiSinkCDCIT.java | 453 +++++++++++++++++++++
.../connector/hudi/HudiSparkS3MultiTableIT.java | 4 +-
.../src/test/resources/ddl/mysql_cdc.sql | 75 ++++
.../src/test/resources/{ => hudi}/core-site.xml | 0
.../test/resources/{ => hudi}/fake_to_hudi.conf | 0
.../fake_to_hudi_with_omit_config_item.conf | 0
.../resources/{ => hudi}/multi_fake_to_hudi.conf | 0
.../mysql_cdc_to_hudi.conf} | 44 +-
.../test/resources/{ => hudi}/s3_fake_to_hudi.conf | 0
.../src/test/resources/mysql/server-gtids/my.cnf | 65 +++
.../src/test/resources/mysql/setup.sql | 27 ++
36 files changed, 893 insertions(+), 341 deletions(-)
diff --git a/docs/en/connector-v2/sink/Hudi.md
b/docs/en/connector-v2/sink/Hudi.md
index 6c424fde15..ea4c066d2f 100644
--- a/docs/en/connector-v2/sink/Hudi.md
+++ b/docs/en/connector-v2/sink/Hudi.md
@@ -8,7 +8,7 @@ Used to write data to Hudi.
## Key features
-- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)
- [x] [support multiple table write](../../concept/connector-v2-features.md)
@@ -21,7 +21,6 @@ Base configuration:
| table_dfs_path | string | yes | -
|
| conf_files_path | string | no | -
|
| table_list | Array | no | -
|
-| auto_commit | boolean | no | true
|
| schema_save_mode | enum | no |
CREATE_SCHEMA_WHEN_NOT_EXIST|
| common-options | Config | no | -
|
@@ -44,6 +43,7 @@ Table list configuration:
| index_type | enum | no | BLOOM |
| index_class_name | string | no | - |
| record_byte_size | Int | no | 1024 |
+| cdc_enabled | boolean| no | false |
Note: When this configuration corresponds to a single table, you can flatten
the configuration items in table_list to the outer layer.
@@ -115,9 +115,9 @@ Note: When this configuration corresponds to a single
table, you can flatten the
`max_commits_to_keep` The max commits to keep of hudi table.
-### auto_commit [boolean]
+### cdc_enabled [boolean]
-`auto_commit` Automatic transaction commit is enabled by default.
+`cdc_enabled` Whether to persist the CDC change log. When enable, persist the
change data if necessary, and the table can be queried as a CDC query mode.
### schema_save_mode [Enum]
diff --git a/docs/zh/connector-v2/sink/Hudi.md
b/docs/zh/connector-v2/sink/Hudi.md
index 2fbf027135..7d8007f6b0 100644
--- a/docs/zh/connector-v2/sink/Hudi.md
+++ b/docs/zh/connector-v2/sink/Hudi.md
@@ -8,7 +8,7 @@
## 主要特点
-- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)
- [x] [support multiple table write](../../concept/connector-v2-features.md)
@@ -21,7 +21,6 @@
| table_dfs_path | string | 是 | - |
| conf_files_path | string | 否 | - |
| table_list | string | 否 | - |
-| auto_commit | boolean| 否 | true |
| schema_save_mode | enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST |
| common-options | config | 否 | - |
@@ -44,6 +43,7 @@
| index_type | enum | no | BLOOM |
| index_class_name | string | no | - |
| record_byte_size | Int | no | 1024 |
+| cdc_enabled | boolean| no | false |
注意: 当此配置对应于单个表时,您可以将table_list中的配置项展平到外层。
@@ -115,9 +115,9 @@
`max_commits_to_keep` Hudi 表保留的最多提交数。
-### auto_commit [boolean]
+### cdc_enabled [boolean]
-`auto_commit` 是否自动提交.
+`cdc_enabled` 是否持久化Hudi表的CDC变更日志。启用后,在必要时持久化更改数据,表可以作为CDC模式进行查询.
### schema_save_mode [Enum]
diff --git a/seatunnel-connectors-v2/connector-hudi/pom.xml
b/seatunnel-connectors-v2/connector-hudi/pom.xml
index 35fc0b0459..1a11d34f47 100644
--- a/seatunnel-connectors-v2/connector-hudi/pom.xml
+++ b/seatunnel-connectors-v2/connector-hudi/pom.xml
@@ -102,4 +102,27 @@
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.avro</pattern>
+
<shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.avro</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java
index e0a25bfd85..0d238c193d 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -53,6 +54,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import static
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.CDC_ENABLED;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.RECORD_KEY_FIELDS;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.TABLE_TYPE;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.sink.convert.AvroSchemaConverter.convertToSchema;
@@ -195,6 +197,7 @@ public class HudiCatalog implements Catalog {
String.join(",", tableConfig.getRecordKeyFields().get()));
}
options.put(TABLE_TYPE.key(), tableType.name());
+ options.put(CDC_ENABLED.key(),
String.valueOf(tableConfig.isCDCEnabled()));
return CatalogTable.of(
TableIdentifier.of(
catalogName, tablePath.getDatabaseName(),
tablePath.getTableName()),
@@ -218,10 +221,16 @@ public class HudiCatalog implements Catalog {
.setTableType(table.getOptions().get(TABLE_TYPE.key()))
.setRecordKeyFields(table.getOptions().get(RECORD_KEY_FIELDS.key()))
.setTableCreateSchema(
-
convertToSchema(table.getSeaTunnelRowType()).toString())
+ convertToSchema(
+ table.getSeaTunnelRowType(),
+
AvroSchemaUtils.getAvroRecordQualifiedName(
+
table.getTableId().getTableName()))
+ .toString())
.setTableName(tablePath.getTableName())
.setPartitionFields(String.join(",",
table.getPartitionKeys()))
.setPayloadClassName(HoodieAvroPayload.class.getName())
+ .setCDCEnabled(
+
Boolean.parseBoolean(table.getOptions().get(CDC_ENABLED.key())))
.initTable(new HadoopStorageConfiguration(hadoopConf),
tablePathStr);
}
} catch (IOException e) {
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java
index 38450e2dfd..745e78eaf9 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java
@@ -44,12 +44,6 @@ public interface HudiOptions {
.noDefaultValue()
.withDescription("table_list");
- Option<Boolean> AUTO_COMMIT =
- Options.key("auto_commit")
- .booleanType()
- .defaultValue(true)
- .withDescription("auto commit");
-
Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
Options.key("schema_save_mode")
.enumType(SchemaSaveMode.class)
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java
index 06650e87c0..bcb4efe77b 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java
@@ -40,15 +40,12 @@ public class HudiSinkConfig implements Serializable {
private String confFilesPath;
- private boolean autoCommit;
-
private SchemaSaveMode schemaSaveMode;
private DataSaveMode dataSaveMode;
public static HudiSinkConfig of(ReadonlyConfig config) {
Builder builder = HudiSinkConfig.builder();
- Optional<Boolean> optionalAutoCommit =
config.getOptional(HudiOptions.AUTO_COMMIT);
Optional<SchemaSaveMode> optionalSchemaSaveMode =
config.getOptional(HudiOptions.SCHEMA_SAVE_MODE);
Optional<DataSaveMode> optionalDataSaveMode =
@@ -58,7 +55,6 @@ public class HudiSinkConfig implements Serializable {
builder.confFilesPath(config.get(HudiOptions.CONF_FILES_PATH));
builder.tableList(HudiTableConfig.of(config));
-
builder.autoCommit(optionalAutoCommit.orElseGet(HudiOptions.AUTO_COMMIT::defaultValue));
builder.schemaSaveMode(
optionalSchemaSaveMode.orElseGet(HudiOptions.SCHEMA_SAVE_MODE::defaultValue));
builder.dataSaveMode(
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java
index ba0ae33efd..1ae612c9cb 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java
@@ -40,6 +40,7 @@ import java.util.stream.Collectors;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.BATCH_INTERVAL_MS;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.CDC_ENABLED;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.DATABASE;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INDEX_CLASS_NAME;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INDEX_TYPE;
@@ -108,6 +109,9 @@ public class HudiTableConfig implements Serializable {
@JsonProperty("max_commits_to_keep")
private int maxCommitsToKeep;
+ @JsonProperty("cdc_enabled")
+ private boolean cdcEnabled;
+
public static List<HudiTableConfig> of(ReadonlyConfig connectorConfig) {
List<HudiTableConfig> tableList;
if (connectorConfig.getOptional(HudiOptions.TABLE_LIST).isPresent()) {
@@ -132,6 +136,7 @@ public class HudiTableConfig implements Serializable {
connectorConfig.get(UPSERT_SHUFFLE_PARALLELISM))
.minCommitsToKeep(connectorConfig.get(MIN_COMMITS_TO_KEEP))
.maxCommitsToKeep(connectorConfig.get(MAX_COMMITS_TO_KEEP))
+ .cdcEnabled(connectorConfig.get(CDC_ENABLED))
.build();
tableList = Collections.singletonList(hudiTableConfig);
}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableOptions.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableOptions.java
index e48ef7be56..2a2c7e01b3 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableOptions.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableOptions.java
@@ -46,6 +46,13 @@ public interface HudiTableOptions {
.defaultValue(HoodieTableType.COPY_ON_WRITE)
.withDescription("hudi table type");
+ Option<Boolean> CDC_ENABLED =
+ Options.key("cdc_enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "When enable, persist the change data if
necessary, and can be queried as a CDC query mode.");
+
Option<String> RECORD_KEY_FIELDS =
Options.key("record_key_fields")
.stringType()
@@ -76,7 +83,7 @@ public interface HudiTableOptions {
Options.key("record_byte_size")
.intType()
.defaultValue(1024)
- .withDescription("auto commit");
+ .withDescription("The byte size of each record");
Option<WriteOperationType> OP_TYPE =
Options.key("op_type")
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
index 13c245336a..11a402ab10 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
@@ -24,7 +24,6 @@ import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
@@ -38,14 +37,12 @@ import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableConfig;
import
org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException;
-import
org.apache.seatunnel.connectors.seatunnel.hudi.sink.commiter.HudiSinkAggregatedCommitter;
import
org.apache.seatunnel.connectors.seatunnel.hudi.sink.state.HudiAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.hudi.sink.state.HudiCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.hudi.sink.state.HudiSinkState;
import
org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.HudiSinkWriter;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -82,15 +79,13 @@ public class HudiSink
@Override
public HudiSinkWriter createWriter(SinkWriter.Context context) throws
IOException {
- return new HudiSinkWriter(
- context, seaTunnelRowType, hudiSinkConfig, hudiTableConfig,
new ArrayList<>());
+ return new HudiSinkWriter(context, seaTunnelRowType, hudiSinkConfig,
hudiTableConfig);
}
@Override
public SinkWriter<SeaTunnelRow, HudiCommitInfo, HudiSinkState>
restoreWriter(
SinkWriter.Context context, List<HudiSinkState> states) throws
IOException {
- return new HudiSinkWriter(
- context, seaTunnelRowType, hudiSinkConfig, hudiTableConfig,
states);
+ return SeaTunnelSink.super.restoreWriter(context, states);
}
@Override
@@ -103,18 +98,6 @@ public class HudiSink
return Optional.of(new DefaultSerializer<>());
}
- @Override
- public Optional<SinkAggregatedCommitter<HudiCommitInfo,
HudiAggregatedCommitInfo>>
- createAggregatedCommitter() throws IOException {
- return Optional.of(
- new HudiSinkAggregatedCommitter(hudiTableConfig,
hudiSinkConfig, seaTunnelRowType));
- }
-
- @Override
- public Optional<Serializer<HudiAggregatedCommitInfo>>
getAggregatedCommitInfoSerializer() {
- return Optional.of(new DefaultSerializer<>());
- }
-
@Override
public Optional<SaveModeHandler> getSaveModeHandler() {
TablePath tablePath =
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
index ed21b15166..7e6d9826d9 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
@@ -37,12 +37,12 @@ import java.util.Arrays;
import java.util.List;
import java.util.Optional;
-import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.AUTO_COMMIT;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.CONF_FILES_PATH;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_DFS_PATH;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_LIST;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.BATCH_INTERVAL_MS;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.CDC_ENABLED;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INDEX_CLASS_NAME;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INDEX_TYPE;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INSERT_SHUFFLE_PARALLELISM;
@@ -85,7 +85,7 @@ public class HudiSinkFactory implements TableSinkFactory {
UPSERT_SHUFFLE_PARALLELISM,
MIN_COMMITS_TO_KEEP,
MAX_COMMITS_TO_KEEP,
- AUTO_COMMIT,
+ CDC_ENABLED,
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
@@ -121,6 +121,10 @@ public class HudiSinkFactory implements TableSinkFactory {
}
// table type
catalogTable.getOptions().put(TABLE_TYPE.key(),
hudiTableConfig.getTableType().name());
+ // cdc enabled
+ catalogTable
+ .getOptions()
+ .put(CDC_ENABLED.key(),
String.valueOf(hudiTableConfig.isCdcEnabled()));
catalogTable =
CatalogTable.of(
newTableId,
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/commiter/HudiSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/commiter/HudiSinkAggregatedCommitter.java
deleted file mode 100644
index beba719c76..0000000000
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/commiter/HudiSinkAggregatedCommitter.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hudi.sink.commiter;
-
-import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkConfig;
-import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableConfig;
-import
org.apache.seatunnel.connectors.seatunnel.hudi.sink.client.HudiWriteClientProvider;
-import
org.apache.seatunnel.connectors.seatunnel.hudi.sink.state.HudiAggregatedCommitInfo;
-import
org.apache.seatunnel.connectors.seatunnel.hudi.sink.state.HudiCommitInfo;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Stack;
-import java.util.stream.Collectors;
-
-@Slf4j
-public class HudiSinkAggregatedCommitter
- implements SinkAggregatedCommitter<HudiCommitInfo,
HudiAggregatedCommitInfo> {
-
- private final HudiTableConfig tableConfig;
-
- private final HudiWriteClientProvider writeClientProvider;
-
- public HudiSinkAggregatedCommitter(
- HudiTableConfig tableConfig,
- HudiSinkConfig sinkConfig,
- SeaTunnelRowType seaTunnelRowType) {
- this.tableConfig = tableConfig;
- this.writeClientProvider =
- new HudiWriteClientProvider(
- sinkConfig, tableConfig.getTableName(),
seaTunnelRowType);
- }
-
- @Override
- public List<HudiAggregatedCommitInfo> commit(
- List<HudiAggregatedCommitInfo> aggregatedCommitInfo) throws
IOException {
- aggregatedCommitInfo =
- aggregatedCommitInfo.stream()
- .filter(
- commit ->
- commit.getHudiCommitInfoList().stream()
- .anyMatch(
- aggregateCommit ->
-
!aggregateCommit
-
.getWriteStatusList()
-
.isEmpty()
- &&
!writeClientProvider
-
.getOrCreateClient()
-
.commit(
-
aggregateCommit
-
.getWriteInstantTime(),
-
aggregateCommit
-
.getWriteStatusList())))
- .collect(Collectors.toList());
- log.debug(
- "hudi records have been committed, error commit info are {}",
aggregatedCommitInfo);
- return aggregatedCommitInfo;
- }
-
- @Override
- public HudiAggregatedCommitInfo combine(List<HudiCommitInfo> commitInfos) {
- return new HudiAggregatedCommitInfo(commitInfos);
- }
-
- @Override
- public void abort(List<HudiAggregatedCommitInfo> aggregatedCommitInfo)
throws Exception {
- writeClientProvider.getOrCreateClient().rollbackFailedWrites();
- // rollback force commit
- for (HudiAggregatedCommitInfo hudiAggregatedCommitInfo :
aggregatedCommitInfo) {
- for (HudiCommitInfo commitInfo :
hudiAggregatedCommitInfo.getHudiCommitInfoList()) {
- Stack<String> forceCommitTime =
commitInfo.getForceCommitTime();
- while (!forceCommitTime.isEmpty()) {
-
writeClientProvider.getOrCreateClient().rollback(forceCommitTime.pop());
- }
- }
- }
- }
-
- @Override
- public void close() {
- writeClientProvider.close();
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/AvroSchemaConverter.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/AvroSchemaConverter.java
index addbf8491f..acb1021275 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/AvroSchemaConverter.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/AvroSchemaConverter.java
@@ -50,13 +50,13 @@ public class AvroSchemaConverter implements Serializable {
* @return Avro's {@link Schema} matching this logical type.
*/
public static Schema convertToSchema(SeaTunnelDataType<?> schema) {
- return convertToSchema(schema,
"org.apache.seatunnel.avro.generated.record");
+ return convertToSchema(schema, "record");
}
/**
* Converts Seatunnel {@link SeaTunnelDataType} (can be nested) into an
Avro schema.
*
- * <p>The "{rowName}_" is used as the nested row type name prefix in order
to generate the right
+ * <p>The "{rowName}." is used as the nested row type name prefix in order
to generate the right
* schema. Nested record type that only differs with type name is still
compatible.
*
* @param dataType logical type
@@ -105,10 +105,15 @@ public class AvroSchemaConverter implements Serializable {
return nullableSchema(time);
case DECIMAL:
DecimalType decimalType = (DecimalType) dataType;
- // store BigDecimal as byte[]
+ // store BigDecimal as Fixed
+ // for spark compatibility.
Schema decimal =
LogicalTypes.decimal(decimalType.getPrecision(),
decimalType.getScale())
-
.addToSchema(SchemaBuilder.builder().bytesType());
+ .addToSchema(
+
SchemaBuilder.fixed(String.format("%s.fixed", rowName))
+ .size(
+
computeMinBytesForDecimalPrecision(
+
decimalType.getPrecision())));
return nullableSchema(decimal);
case ROW:
SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
@@ -121,7 +126,7 @@ public class AvroSchemaConverter implements Serializable {
SeaTunnelDataType<?> fieldType = rowType.getFieldType(i);
SchemaBuilder.GenericDefault<Schema> fieldBuilder =
builder.name(fieldName)
- .type(convertToSchema(fieldType, rowName +
"_" + fieldName));
+ .type(convertToSchema(fieldType, rowName +
"." + fieldName));
builder = fieldBuilder.withDefault(null);
}
@@ -166,4 +171,12 @@ public class AvroSchemaConverter implements Serializable {
private static Schema nullableSchema(Schema schema) {
return Schema.createUnion(SchemaBuilder.builder().nullType(), schema);
}
+
+ private static int computeMinBytesForDecimalPrecision(int precision) {
+ int numBytes = 1;
+ while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, precision)) {
+ numBytes += 1;
+ }
+ return numBytes;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/HudiRecordConverter.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/HudiRecordConverter.java
index d6ed7e81eb..e8d23ab4ef 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/HudiRecordConverter.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/HudiRecordConverter.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableConfig;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
@@ -62,7 +63,12 @@ public class HudiRecordConverter implements Serializable {
seaTunnelRowType.getFieldNames()[i],
createConverter(seaTunnelRowType.getFieldType(i))
.convert(
-
convertToSchema(seaTunnelRowType.getFieldType(i)),
+ convertToSchema(
+ seaTunnelRowType.getFieldType(i),
+
AvroSchemaUtils.getAvroRecordQualifiedName(
+
hudiTableConfig.getTableName())
+ + "."
+ +
seaTunnelRowType.getFieldNames()[i]),
element.getField(i)));
}
return new HoodieAvroRecord<>(
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/RowDataToAvroConverters.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/RowDataToAvroConverters.java
index a48179fdb7..5c06362669 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/RowDataToAvroConverters.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/RowDataToAvroConverters.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -45,6 +46,8 @@ import static
org.apache.seatunnel.connectors.seatunnel.hudi.sink.convert.AvroSc
/** Tool class used to convert from {@link SeaTunnelRow} to Avro {@link
GenericRecord}. */
public class RowDataToAvroConverters implements Serializable {
+ private static final Conversions.DecimalConversion DECIMAL_CONVERSION =
+ new Conversions.DecimalConversion();
//
--------------------------------------------------------------------------------
// Runtime Converters
//
--------------------------------------------------------------------------------
@@ -166,8 +169,9 @@ public class RowDataToAvroConverters implements
Serializable {
@Override
public Object convert(Schema schema, Object
object) {
- return ByteBuffer.wrap(
- ((BigDecimal)
object).unscaledValue().toByteArray());
+ BigDecimal javaDecimal = (BigDecimal) object;
+ return DECIMAL_CONVERSION.toFixed(
+ javaDecimal, schema,
schema.getLogicalType());
}
};
break;
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/state/HudiAggregatedCommitInfo.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/state/HudiAggregatedCommitInfo.java
index 348a040be6..065fed72ad 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/state/HudiAggregatedCommitInfo.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/state/HudiAggregatedCommitInfo.java
@@ -17,15 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.hudi.sink.state;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
import java.io.Serializable;
-import java.util.List;
-
-@Data
-@AllArgsConstructor
-public class HudiAggregatedCommitInfo implements Serializable {
- private final List<HudiCommitInfo> hudiCommitInfoList;
-}
+public class HudiAggregatedCommitInfo implements Serializable {}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/state/HudiCommitInfo.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/state/HudiCommitInfo.java
index 0357931bb0..808cc4d942 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/state/HudiCommitInfo.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/state/HudiCommitInfo.java
@@ -17,22 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.hudi.sink.state;
-import org.apache.hudi.client.WriteStatus;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
import java.io.Serializable;
-import java.util.List;
-import java.util.Stack;
-
-@Data
-@AllArgsConstructor
-public class HudiCommitInfo implements Serializable {
-
- private final String writeInstantTime;
-
- private final List<WriteStatus> writeStatusList;
- private final Stack<String> forceCommitTime;
-}
+public class HudiCommitInfo implements Serializable {}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiRecordWriter.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiRecordWriter.java
index 7eb3ab546b..b98e222870 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiRecordWriter.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiRecordWriter.java
@@ -17,23 +17,23 @@
package org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer;
+import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableConfig;
import
org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException;
import org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.hudi.sink.client.WriteClientProvider;
import
org.apache.seatunnel.connectors.seatunnel.hudi.sink.convert.HudiRecordConverter;
-import
org.apache.seatunnel.connectors.seatunnel.hudi.sink.state.HudiCommitInfo;
import org.apache.avro.Schema;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.HoodieJavaWriteClient;
-import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,10 +42,10 @@ import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Stack;
+import java.util.Map;
+import java.util.Set;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.sink.convert.AvroSchemaConverter.convertToSchema;
@@ -64,60 +64,44 @@ public class HudiRecordWriter implements Serializable {
private final SeaTunnelRowType seaTunnelRowType;
- private final boolean autoCommit;
-
private Schema schema;
private transient int batchCount = 0;
private final List<HoodieRecord<HoodieAvroPayload>> writeRecords;
- private Stack<String> forceCommitTime;
-
- private String writeInstantTime;
+ private final List<HoodieKey> deleteRecordKeys;
- private List<WriteStatus> writeStatusList;
+ private final LinkedHashMap<HoodieKey, Pair<Boolean,
HoodieRecord<HoodieAvroPayload>>> buffer =
+ new LinkedHashMap<>();
private transient volatile boolean closed = false;
private transient volatile Exception flushException;
public HudiRecordWriter(
- HudiSinkConfig hudiSinkConfig,
HudiTableConfig hudiTableConfig,
WriteClientProvider clientProvider,
SeaTunnelRowType seaTunnelRowType) {
this.hudiTableConfig = hudiTableConfig;
- this.autoCommit = hudiSinkConfig.isAutoCommit();
this.clientProvider = clientProvider;
this.seaTunnelRowType = seaTunnelRowType;
this.writeRecords = new ArrayList<>();
- this.writeStatusList = new ArrayList<>();
- this.forceCommitTime = new Stack<>();
+ this.deleteRecordKeys = new ArrayList<>();
this.recordConverter = new HudiRecordConverter();
}
- public HudiRecordWriter(
- HudiSinkConfig sinkConfig,
- HudiTableConfig tableConfig,
- WriteClientProvider writeClientProvider,
- SeaTunnelRowType seaTunnelRowType,
- HudiCommitInfo hudiCommitInfo) {
- this(sinkConfig, tableConfig, writeClientProvider, seaTunnelRowType);
- this.writeInstantTime = hudiCommitInfo.getWriteInstantTime();
- this.writeStatusList = hudiCommitInfo.getWriteStatusList();
- }
-
public void open() {
- this.schema = new
Schema.Parser().parse(convertToSchema(seaTunnelRowType).toString());
+ this.schema =
+ new Schema.Parser()
+ .parse(
+ convertToSchema(
+ seaTunnelRowType,
+
AvroSchemaUtils.getAvroRecordQualifiedName(
+
hudiTableConfig.getTableName()))
+ .toString());
try {
- HoodieJavaWriteClient<HoodieAvroPayload> writeClient =
- clientProvider.getOrCreateClient();
- if (StringUtils.nonEmpty(writeInstantTime) &&
Objects.nonNull(writeStatusList)) {
- if (!writeClient.commit(writeInstantTime, writeStatusList)) {
- LOG.warn("Failed to commit history data.");
- }
- }
+ clientProvider.getOrCreateClient();
} catch (Exception e) {
throw new HudiConnectorException(
CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED,
@@ -133,7 +117,7 @@ public class HudiRecordWriter implements Serializable {
batchCount++;
if (hudiTableConfig.getBatchSize() > 0
&& batchCount >= hudiTableConfig.getBatchSize()) {
- flush(true);
+ flush();
}
} catch (Exception e) {
throw new HudiConnectorException(
@@ -143,92 +127,89 @@ public class HudiRecordWriter implements Serializable {
}
}
- public synchronized void flush(boolean isNeedForceCommit) {
+ public synchronized void flush() {
if (batchCount == 0) {
log.debug("No data needs to be refreshed, waiting for incoming
data.");
return;
}
checkFlushException();
- HoodieJavaWriteClient<HoodieAvroPayload> writeClient =
clientProvider.getOrCreateClient();
- if (autoCommit || writeInstantTime == null) {
- writeInstantTime = writeClient.startCommit();
+ Boolean preChangeFlag = null;
+ Set<Map.Entry<HoodieKey, Pair<Boolean,
HoodieRecord<HoodieAvroPayload>>>> entries =
+ buffer.entrySet();
+ for (Map.Entry<HoodieKey, Pair<Boolean,
HoodieRecord<HoodieAvroPayload>>> entry : entries) {
+ boolean currentChangeFlag = entry.getValue().getKey();
+ if (currentChangeFlag) {
+ if (preChangeFlag != null && !preChangeFlag) {
+ executeDelete();
+ }
+ writeRecords.add(entry.getValue().getValue());
+ } else {
+ if (preChangeFlag != null && preChangeFlag) {
+ executeWrite();
+ }
+ deleteRecordKeys.add(entry.getKey());
+ }
+ preChangeFlag = currentChangeFlag;
}
- List<WriteStatus> currentWriteStatusList;
+
+ if (preChangeFlag != null) {
+ if (preChangeFlag) {
+ executeWrite();
+ } else {
+ executeDelete();
+ }
+ }
+ batchCount = 0;
+ buffer.clear();
+ }
+
+ private void executeWrite() {
+ HoodieJavaWriteClient<HoodieAvroPayload> writeClient =
clientProvider.getOrCreateClient();
+ String writeInstantTime = writeClient.startCommit();
// write records
switch (hudiTableConfig.getOpType()) {
case INSERT:
- currentWriteStatusList = writeClient.insert(writeRecords,
writeInstantTime);
+ writeClient.insert(writeRecords, writeInstantTime);
break;
case UPSERT:
- currentWriteStatusList = writeClient.upsert(writeRecords,
writeInstantTime);
+ writeClient.upsert(writeRecords, writeInstantTime);
break;
case BULK_INSERT:
- currentWriteStatusList = writeClient.bulkInsert(writeRecords,
writeInstantTime);
+ writeClient.bulkInsert(writeRecords, writeInstantTime);
break;
default:
throw new HudiConnectorException(
HudiErrorCode.UNSUPPORTED_OPERATION,
"Unsupported operation type: " +
hudiTableConfig.getOpType());
}
- if (!autoCommit) {
- this.writeStatusList.addAll(currentWriteStatusList);
- }
- /**
- * when the batch size of temporary records is reached, commit is
forced here, even if
- * configured not to be auto commit. because a timeline supports only
one commit.
- */
- forceCommit(isNeedForceCommit, autoCommit);
writeRecords.clear();
- batchCount = 0;
- }
-
- public Optional<HudiCommitInfo> prepareCommit() {
- flush(false);
- if (!autoCommit) {
- return Optional.of(
- new HudiCommitInfo(writeInstantTime, writeStatusList,
forceCommitTime));
- }
- return Optional.empty();
- }
-
- private void commit() {
- if (StringUtils.nonEmpty(writeInstantTime) &&
!writeStatusList.isEmpty()) {
- log.debug(
- "Commit hudi records, the instant time is {} and write
status are {}",
- writeInstantTime,
- writeStatusList);
- clientProvider.getOrCreateClient().commit(writeInstantTime,
writeStatusList);
- resetUpsertCommitInfo();
- }
- }
-
- private void forceCommit(boolean isNeedForceCommit, boolean isAutoCommit) {
- if (isNeedForceCommit && !isAutoCommit) {
- clientProvider.getOrCreateClient().commit(writeInstantTime,
writeStatusList);
- forceCommitTime.add(writeInstantTime);
- resetUpsertCommitInfo();
- }
}
- public HudiCommitInfo snapshotState() {
- HudiCommitInfo hudiCommitInfo =
- new HudiCommitInfo(writeInstantTime, writeStatusList,
forceCommitTime);
- // reset commit info in here, because the commit info will be
committed in committer.
- resetUpsertCommitInfo();
- // reset the force commit stack.
- forceCommitTime = new Stack<>();
- return hudiCommitInfo;
- }
-
- protected void resetUpsertCommitInfo() {
- writeInstantTime = null;
- writeStatusList = new ArrayList<>();
+ private void executeDelete() {
+ HoodieJavaWriteClient<HoodieAvroPayload> writeClient =
clientProvider.getOrCreateClient();
+ writeClient.delete(deleteRecordKeys, writeClient.startCommit());
+ deleteRecordKeys.clear();
}
protected void prepareRecords(SeaTunnelRow element) {
HoodieRecord<HoodieAvroPayload> hoodieAvroPayloadHoodieRecord =
recordConverter.convertRow(schema, seaTunnelRowType, element,
hudiTableConfig);
- writeRecords.add(hoodieAvroPayloadHoodieRecord);
+ HoodieKey recordKey = hoodieAvroPayloadHoodieRecord.getKey();
+ boolean changeFlag = changeFlag(element.getRowKind());
+ buffer.put(recordKey, Pair.of(changeFlag,
hoodieAvroPayloadHoodieRecord));
+ }
+
+ private boolean changeFlag(RowKind rowKind) {
+ switch (rowKind) {
+ case DELETE:
+ case UPDATE_BEFORE:
+ return false;
+ case INSERT:
+ case UPDATE_AFTER:
+ return true;
+ default:
+ throw new UnsupportedOperationException("Unknown row kind: " +
rowKind);
+ }
}
protected void checkFlushException() {
@@ -245,10 +226,7 @@ public class HudiRecordWriter implements Serializable {
if (!closed) {
closed = true;
try {
- flush(false);
- if (!autoCommit) {
- commit();
- }
+ flush();
} catch (Exception e) {
LOG.warn("Flush records to Hudi failed.", e);
flushException =
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiSinkWriter.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiSinkWriter.java
index 317215861a..130a79adab 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiSinkWriter.java
@@ -35,8 +35,6 @@ import
org.apache.seatunnel.connectors.seatunnel.hudi.sink.state.HudiSinkState;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
import java.util.Optional;
@Slf4j
@@ -60,27 +58,15 @@ public class HudiSinkWriter
Context context,
SeaTunnelRowType seaTunnelRowType,
HudiSinkConfig sinkConfig,
- HudiTableConfig tableConfig,
- List<HudiSinkState> hudiSinkState) {
+ HudiTableConfig tableConfig) {
this.sinkConfig = sinkConfig;
this.tableConfig = tableConfig;
this.seaTunnelRowType = seaTunnelRowType;
this.writeClientProvider =
new HudiWriteClientProvider(
sinkConfig, tableConfig.getTableName(),
seaTunnelRowType);
- if (!hudiSinkState.isEmpty()) {
- this.hudiRecordWriter =
- new HudiRecordWriter(
- sinkConfig,
- tableConfig,
- writeClientProvider,
- seaTunnelRowType,
- hudiSinkState.get(0).getHudiCommitInfo());
- } else {
- this.hudiRecordWriter =
- new HudiRecordWriter(
- sinkConfig, tableConfig, writeClientProvider,
seaTunnelRowType);
- }
+ this.hudiRecordWriter =
+ new HudiRecordWriter(tableConfig, writeClientProvider,
seaTunnelRowType);
}
@Override
@@ -89,16 +75,11 @@ public class HudiSinkWriter
hudiRecordWriter.writeRecord(element);
}
- @Override
- public List<HudiSinkState> snapshotState(long checkpointId) throws
IOException {
- return Collections.singletonList(
- new HudiSinkState(checkpointId,
hudiRecordWriter.snapshotState()));
- }
-
@Override
public Optional<HudiCommitInfo> prepareCommit() throws IOException {
tryOpen();
- return hudiRecordWriter.prepareCommit();
+ hudiRecordWriter.flush();
+ return Optional.empty();
}
@Override
@@ -128,8 +109,7 @@ public class HudiSinkWriter
queueIndex,
tableConfig.getTableName());
this.hudiRecordWriter =
- new HudiRecordWriter(
- sinkConfig, tableConfig, writeClientProvider,
seaTunnelRowType);
+ new HudiRecordWriter(tableConfig, writeClientProvider,
seaTunnelRowType);
}
private void tryOpen() {
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/util/HudiUtil.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/util/HudiUtil.java
index fe6cbe3e20..ef49c28a21 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/util/HudiUtil.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/util/HudiUtil.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.config.HoodieStorageConfig;
@@ -173,7 +174,12 @@ public class HudiUtil {
hudiSinkConfig.getTableDfsPath(),
hudiTable.getDatabase(),
hudiTable.getTableName()))
-
.withSchema(convertToSchema(seaTunnelRowType).toString())
+ .withSchema(
+ convertToSchema(
+ seaTunnelRowType,
+
AvroSchemaUtils.getAvroRecordQualifiedName(
+ tableName))
+ .toString())
.withParallelism(
hudiTable.getInsertShuffleParallelism(),
hudiTable.getUpsertShuffleParallelism())
@@ -184,7 +190,6 @@ public class HudiUtil {
hudiTable.getMinCommitsToKeep(),
hudiTable.getMaxCommitsToKeep())
.build())
- .withAutoCommit(hudiSinkConfig.isAutoCommit())
.withCleanConfig(
HoodieCleanConfig.newBuilder()
.withAutoClean(true)
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiTest.java
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiTest.java
index 82e85fcf4e..7dbfc402b6 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiTest.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiTest.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.hudi;
+import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -27,6 +28,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
@@ -52,6 +54,7 @@ import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
+import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalTime;
@@ -95,7 +98,8 @@ public class HudiTest {
"date",
"time",
"timestamp3",
- "map"
+ "map",
+ "decimal"
},
new SeaTunnelDataType[] {
BOOLEAN_TYPE,
@@ -107,16 +111,19 @@ public class HudiTest {
LocalTimeType.LOCAL_TIME_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
new MapType(STRING_TYPE, LONG_TYPE),
+ new DecimalType(10, 5),
});
private String getSchema() {
- return convertToSchema(seaTunnelRowType).toString();
+ return convertToSchema(
+ seaTunnelRowType,
AvroSchemaUtils.getAvroRecordQualifiedName(tableName))
+ .toString();
}
@Test
void testSchema() {
Assertions.assertEquals(
-
"{\"type\":\"record\",\"name\":\"record\",\"namespace\":\"org.apache.seatunnel.avro.generated\",\"fields\":[{\"name\":\"bool\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"int\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"longValue\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"float\",\"type\":[\"null\",\"float\"],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"date\",\"type\":
[...]
+
"{\"type\":\"record\",\"name\":\"hudi_record\",\"namespace\":\"hoodie.hudi\",\"fields\":[{\"name\":\"bool\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"int\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"longValue\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"float\",\"type\":[\"null\",\"float\"],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"date\",\"type\":[\"null\",{\"type\"
[...]
getSchema());
}
@@ -165,7 +172,8 @@ public class HudiTest {
expected.setField(7, timestamp3.toLocalDateTime());
Map<String, Long> map = new HashMap<>();
map.put("element", 123L);
- expected.setField(9, map);
+ expected.setField(8, map);
+ expected.setField(9, BigDecimal.valueOf(10.121));
String instantTime = javaWriteClient.startCommit();
List<HoodieRecord<HoodieAvroPayload>> hoodieRecords = new
ArrayList<>();
hoodieRecords.add(convertRow(expected));
@@ -178,13 +186,23 @@ public class HudiTest {
private HoodieRecord<HoodieAvroPayload> convertRow(SeaTunnelRow element) {
GenericRecord rec =
new GenericData.Record(
- new
Schema.Parser().parse(convertToSchema(seaTunnelRowType).toString()));
+ new Schema.Parser()
+ .parse(
+ convertToSchema(
+ seaTunnelRowType,
+
AvroSchemaUtils.getAvroRecordQualifiedName(
+ tableName))
+ .toString()));
for (int i = 0; i < seaTunnelRowType.getTotalFields(); i++) {
rec.put(
seaTunnelRowType.getFieldNames()[i],
createConverter(seaTunnelRowType.getFieldType(i))
.convert(
-
convertToSchema(seaTunnelRowType.getFieldType(i)),
+ convertToSchema(
+ seaTunnelRowType.getFieldType(i),
+
AvroSchemaUtils.getAvroRecordQualifiedName(tableName)
+ + "."
+ +
seaTunnelRowType.getFieldNames()[i]),
element.getField(i)));
}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalogTest.java
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalogTest.java
index 7be81e89ba..d3524c85c4 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalogTest.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalogTest.java
@@ -168,6 +168,7 @@ class HudiCatalogTest {
TableSchema schema = builder.build();
HashMap<String, String> options = new HashMap<>();
options.put("record_key_fields", "id,boolean_col");
+ options.put("cdc_enabled", "false");
options.put("table_type", "MERGE_ON_READ");
return CatalogTable.of(
tableIdentifier, schema, options,
Collections.singletonList("dt_col"), "null");
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml
index 583b16a162..7fe8cc8523 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml
@@ -30,6 +30,18 @@
<minio.version>8.5.6</minio.version>
</properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-jdbc</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<dependencies>
<!-- minio containers -->
<dependency>
@@ -60,5 +72,25 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-mysql</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mysql</artifactId>
+ <version>${testcontainer.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiIT.java
index 28f2eb3f53..642b94471d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiIT.java
@@ -108,7 +108,7 @@ public class HudiIT extends TestSuiteBase {
disabledReason = "FLINK do not support local file catalog in
hudi.")
public void testWriteHudi(TestContainer container)
throws IOException, InterruptedException, URISyntaxException {
- Container.ExecResult textWriteResult =
container.executeJob("/fake_to_hudi.conf");
+ Container.ExecResult textWriteResult =
container.executeJob("/hudi/fake_to_hudi.conf");
Assertions.assertEquals(0, textWriteResult.getExitCode());
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", LocalFileSystem.DEFAULT_FS);
@@ -146,7 +146,7 @@ public class HudiIT extends TestSuiteBase {
public void testWriteHudiWithOmitConfigItem(TestContainer container)
throws IOException, InterruptedException, URISyntaxException {
Container.ExecResult textWriteResult =
-
container.executeJob("/fake_to_hudi_with_omit_config_item.conf");
+
container.executeJob("/hudi/fake_to_hudi_with_omit_config_item.conf");
Assertions.assertEquals(0, textWriteResult.getExitCode());
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", LocalFileSystem.DEFAULT_FS);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiMultiTableIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiMultiTableIT.java
index 0a9c4555ad..c240b85da7 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiMultiTableIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiMultiTableIT.java
@@ -105,7 +105,8 @@ public class HudiMultiTableIT extends TestSuiteBase {
type = {EngineType.FLINK},
disabledReason = "FLINK do not support local file catalog in
hudi.")
public void testMultiWrite(TestContainer container) throws IOException,
InterruptedException {
- Container.ExecResult textWriteResult =
container.executeJob("/multi_fake_to_hudi.conf");
+ Container.ExecResult textWriteResult =
+ container.executeJob("/hudi/multi_fake_to_hudi.conf");
Assertions.assertEquals(0, textWriteResult.getExitCode());
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", LocalFileSystem.DEFAULT_FS);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSeatunnelS3MultiTableIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSeatunnelS3MultiTableIT.java
index 67f3e9e884..237fd100d2 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSeatunnelS3MultiTableIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSeatunnelS3MultiTableIT.java
@@ -134,8 +134,8 @@ public class HudiSeatunnelS3MultiTableIT extends
SeaTunnelContainer {
@Test
public void testS3MultiWrite() throws IOException, InterruptedException {
- copyFileToContainer("/core-site.xml",
"/tmp/seatunnel/config/core-site.xml");
- Container.ExecResult textWriteResult =
executeSeaTunnelJob("/s3_fake_to_hudi.conf");
+ copyFileToContainer("/hudi/core-site.xml",
"/tmp/seatunnel/config/core-site.xml");
+ Container.ExecResult textWriteResult =
executeSeaTunnelJob("/hudi/s3_fake_to_hudi.conf");
Assertions.assertEquals(0, textWriteResult.getExitCode());
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", LocalFileSystem.DEFAULT_FS);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSinkCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSinkCDCIT.java
new file mode 100644
index 0000000000..1610618fef
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSinkCDCIT.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.hudi;
+
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static java.lang.Thread.sleep;
+import static org.awaitility.Awaitility.given;
+
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.FLINK, EngineType.SPARK},
+ disabledReason =
+ "FLINK do not support local file catalog in hudi and Currently
SPARK do not support cdc")
+@Slf4j
+public class HudiSinkCDCIT extends TestSuiteBase implements TestResource {
+
+ // mysql
+ private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_USER_NAME = "st_user";
+ private static final String MYSQL_USER_PASSWORD = "seatunnel";
+ private static final String MYSQL_DATABASE = "mysql_cdc";
+ private static final MySqlContainer MYSQL_CONTAINER =
createMySqlContainer(MySqlVersion.V8_0);
+ private static final String SOURCE_TABLE = "mysql_cdc_e2e_source_table";
+
+ private static final String MYSQL_DRIVER =
+
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";
+
+ private static final String DATABASE = "st";
+ private static final String TABLE_NAME = "st_test";
+ private static final String TABLE_PATH = "/tmp/hudi/";
+ private static final String NAMESPACE = "hudi";
+ private static final String NAMESPACE_TAR = "hudi.tar.gz";
+
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(
+ MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw",
MYSQL_DATABASE);
+
+ private final Map<Integer, Record> records = new HashMap<>();
+
+ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+ return new MySqlContainer(version)
+ .withConfigurationOverride("mysql/server-gtids/my.cnf")
+ .withSetupSQL("mysql/setup.sql")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withDatabaseName(MYSQL_DATABASE)
+ .withUsername(MYSQL_USER_NAME)
+ .withPassword(MYSQL_USER_PASSWORD)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-mysql-image")));
+ }
+
+ protected final ContainerExtendedFactory containerExtendedFactory =
+ new ContainerExtendedFactory() {
+ @Override
+ public void extend(GenericContainer<?> container)
+ throws IOException, InterruptedException {
+ container.execInContainer(
+ "sh",
+ "-c",
+ "cd /tmp" + " && tar -czvf " + NAMESPACE_TAR + " "
+ NAMESPACE);
+ container.copyFileFromContainer(
+ "/tmp/" + NAMESPACE_TAR, "/tmp/" + NAMESPACE_TAR);
+
+ extractFiles();
+ }
+
+ private void extractFiles() {
+ ProcessBuilder processBuilder = new ProcessBuilder();
+ processBuilder.command(
+ "sh", "-c", "cd /tmp" + " && tar -zxvf " +
NAMESPACE_TAR);
+ try {
+ Process process = processBuilder.start();
+ int exitCode = process.waitFor();
+ if (exitCode == 0) {
+ log.info("Extract files successful.");
+ } else {
+ log.error("Extract files failed with exit code " +
exitCode);
+ }
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ container.execInContainer("sh", "-c", "mkdir -p " +
TABLE_PATH);
+ container.execInContainer("sh", "-c", "chmod -R 777 " +
TABLE_PATH);
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "sh",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/MySQL-CDC/lib
&& cd /tmp/seatunnel/plugins/MySQL-CDC/lib && wget "
+ + MYSQL_DRIVER);
+ Assertions.assertEquals(0, extraCommands.getExitCode(),
extraCommands.getStderr());
+ };
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ log.info("The second stage: Starting Mysql containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ log.info("Mysql Containers are started");
+ inventoryDatabase.createAndInitialize();
+ log.info("Mysql ddl execution is complete");
+ }
+
+ private void insertRecord(Record record) {
+ Integer id = record.getId();
+ records.put(id, record);
+ }
+
+ private void deleteRecord(int id) {
+ records.remove(id);
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ // close Container
+ if (MYSQL_CONTAINER != null) {
+ MYSQL_CONTAINER.close();
+ }
+ }
+
+ @TestTemplate
+ public void testMysqlCdc2Hudi(TestContainer container)
+ throws IOException, InterruptedException {
+ // Clear related content to ensure that multiple operations are not
affected
+ clearTable(MYSQL_DATABASE, SOURCE_TABLE);
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob("/hudi/mysql_cdc_to_hudi.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+ // insert data and check
+ insertAndCheckData(container);
+ // upsert/delete data and check
+ upsertAndCheckData(container);
+ }
+
+ private void insertAndCheckData(TestContainer container) throws
InterruptedException {
+ // Init table data
+ initSourceTableData(MYSQL_DATABASE, SOURCE_TABLE);
+ // Waiting 30s for source capture data
+ sleep(30000);
+ Configuration configuration = new Configuration();
+ configuration.set("fs.defaultFS", LocalFileSystem.DEFAULT_FS);
+
+ given().ignoreExceptions()
+ .await()
+ .atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ // copy hudi to local
+
container.executeExtraCommands(containerExtendedFactory);
+ Path newestCommitFilePath =
+ getNewestCommitFilePath(
+ new File(
+ TABLE_PATH
+ + File.separator
+ + DATABASE
+ + File.separator
+ + TABLE_NAME));
+ ParquetReader<Group> reader =
+ ParquetReader.builder(
+ new GroupReadSupport(),
newestCommitFilePath)
+ .withConf(configuration)
+ .build();
+
+ // Read data and count rows
+ long rowCount = 0;
+ Group read = reader.read();
+ while (read != null) {
+ checkData(read);
+ read = reader.read();
+ rowCount++;
+ }
+ Assertions.assertEquals(3, rowCount);
+ });
+ FileUtils.deleteFile(TABLE_PATH);
+ }
+
+ private void upsertAndCheckData(TestContainer container)
+ throws InterruptedException, IOException {
+ upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE);
+ // Waiting 30s for source capture data
+ sleep(30000);
+ Configuration configuration = new Configuration();
+ configuration.set("fs.defaultFS", LocalFileSystem.DEFAULT_FS);
+
+ given().ignoreExceptions()
+ .await()
+ .atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ // copy hudi to local
+
container.executeExtraCommands(containerExtendedFactory);
+ Path newestCommitFilePath =
+ getNewestCommitFilePath(
+ new File(
+ TABLE_PATH
+ + File.separator
+ + DATABASE
+ + File.separator
+ + TABLE_NAME));
+ ParquetReader<Group> reader =
+ ParquetReader.builder(
+ new GroupReadSupport(),
newestCommitFilePath)
+ .withConf(configuration)
+ .build();
+ // Read data and count rows
+ long rowCount = 0;
+ Group read = reader.read();
+ while (read != null) {
+ checkData(read);
+ read = reader.read();
+ rowCount++;
+ }
+ Assertions.assertEquals(4, rowCount);
+ });
+ FileUtils.deleteFile(TABLE_PATH);
+ }
+
+ public static Path getNewestCommitFilePath(File tablePathDir) throws
IOException {
+ File[] files = FileUtil.listFiles(tablePathDir);
+ Long newestCommitTime =
+ Arrays.stream(files)
+ .filter(file -> file.getName().endsWith(".parquet"))
+ .map(
+ file ->
+ Long.parseLong(
+ file.getName()
+ .substring(
+
file.getName().lastIndexOf("_") + 1,
+ file.getName()
+
.lastIndexOf(".parquet"))))
+ .max(Long::compareTo)
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ "Not found parquet file in " +
tablePathDir));
+ for (File file : files) {
+ if (file.getName().endsWith(newestCommitTime + ".parquet")) {
+ return new Path(file.toURI());
+ }
+ }
+ throw new IllegalArgumentException("Not found parquet file in " +
tablePathDir);
+ }
+
+ private void checkData(Group readRecord) {
+ Integer id = readRecord.getInteger("id", 0);
+ Record record = records.get(id);
+ Assertions.assertNotNull(record);
+ String f_json = readRecord.getString("f_json", 0);
+ Long f_bigint = readRecord.getLong("f_bigint", 0);
+ Assertions.assertEquals(
+ JsonUtils.parseObject(record.getJson()),
(JsonUtils.parseObject(f_json)));
+ Assertions.assertEquals(record.getBigInt(), f_bigint);
+ }
+
+ private void clearTable(String database, String tableName) {
+ executeSql("truncate table " + database + "." + tableName);
+ }
+
+ // Execute SQL
+ private void executeSql(String sql) {
+ try (Connection connection = getJdbcConnection()) {
+ connection.createStatement().execute(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Connection getJdbcConnection() throws SQLException {
+ return DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword());
+ }
+
+ private void initSourceTableData(String database, String tableName) {
+ executeSql(
+ "INSERT INTO "
+ + database
+ + "."
+ + tableName
+ + " ( id, f_binary, f_blob, f_long_varbinary,
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+ + "
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned,
f_integer,\n"
+ + "
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float,
f_double,\n"
+ + "
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar,
f_date, f_datetime,\n"
+ + "
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar,
f_real, f_time,\n"
+ + " f_tinyint,
f_tinyint_unsigned, f_json, f_year )\n"
+ + "VALUES ( 1,
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+ + " 0x68656C6C6F,
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+ + " 0x74696E79626C6F62,
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321,
1234567, 7654321,\n"
+ + " 123456789, 987654321, 123, 789, 12.34,
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+ + " 'This is a text field', 'This is a tiny
text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n"
+ + " '2023-04-27 11:08:40', 1,
b'0101010101010101010101010101010101010101010101010101010101010101', 'C',
'enum2',\n"
+ + "
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A,
'This is a long varchar field',\n"
+ + " 12.345, '14:30:00', -128, 255, '{ \"key\":
\"value\" }', 2022 ),\n"
+ + " ( 2,
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+ + " 0x68656C6C6F,
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,
0x74696E79626C6F62,\n"
+ + " 0x48656C6C6F20776F726C64, 12345, 54321,
123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321,\n"
+ + " 123, 789, 12.34, 56.78, 90.12, 'This is a
long text field', 'This is a medium text field', 'This is a text field',\n"
+ + " 'This is a tiny text field', 'This is a
varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40',\n"
+ + " 1,
b'0101010101010101010101010101010101010101010101010101010101010101', 'C',
'enum2',\n"
+ + "
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A,
'This is a long varchar field',\n"
+ + " 112.345, '14:30:00', -128, 22, '{ \"key\":
\"value\" }', 2013 ),\n"
+ + " ( 3,
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+ + " 0x68656C6C6F,
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,
0x74696E79626C6F62,\n"
+ + " 0x48656C6C6F20776F726C64, 12345, 54321,
123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321,
123,\n"
+ + " 789, 12.34, 56.78, 90.12, 'This is a long
text field', 'This is a medium text field', 'This is a text field',\n"
+ + " 'This is a tiny text field', 'This is a
varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40',\n"
+ + " 1,
b'0101010101010101010101010101010101010101010101010101010101010101', 'C',
'enum2',\n"
+ + "
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A,
'This is a long varchar field', 112.345,\n"
+ + " '14:30:00', -128, 22, '{ \"key\":
\"value\" }', 2021 )");
+ insertRecord(new Record(1, 123456789L, "{ \"key\": \"value\" }"));
+ insertRecord(new Record(2, 123456789L, "{ \"key\": \"value\" }"));
+ insertRecord(new Record(3, 123456789L, "{ \"key\": \"value\" }"));
+ }
+
+ private void upsertDeleteSourceTable(String database, String tableName) {
+ executeSql(
+ "INSERT INTO "
+ + database
+ + "."
+ + tableName
+ + " ( id, f_binary, f_blob, f_long_varbinary,
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+ + "
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned,
f_integer,\n"
+ + "
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float,
f_double,\n"
+ + "
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar,
f_date, f_datetime,\n"
+ + "
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar,
f_real, f_time,\n"
+ + " f_tinyint,
f_tinyint_unsigned, f_json, f_year )\n"
+ + "VALUES ( 4,
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+ + " 0x68656C6C6F,
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+ + " 0x74696E79626C6F62,
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321,
1234567, 7654321,\n"
+ + " 1234567890, 987654321, 123, 789, 12.34,
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+ + " 'This is a text field', 'This is a tiny
text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n"
+ + " '2023-04-27 11:08:40', 1,
b'0101010101010101010101010101010101010101010101010101010101010101', 'C',
'enum2',\n"
+ + "
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A,
'This is a long varchar field',\n"
+ + " 12.345, '14:30:00', -128, 255, '{ \"key\":
\"value\" }', 1992 )");
+ insertRecord(new Record(4, 1234567890L, "{ \"key\": \"value\" }"));
+
+ executeSql(
+ "INSERT INTO "
+ + database
+ + "."
+ + tableName
+ + " ( id, f_binary, f_blob, f_long_varbinary,
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+ + "
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned,
f_integer,\n"
+ + "
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float,
f_double,\n"
+ + "
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar,
f_date, f_datetime,\n"
+ + "
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar,
f_real, f_time,\n"
+ + " f_tinyint,
f_tinyint_unsigned, f_json, f_year )\n"
+ + "VALUES ( 5,
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+ + " 0x68656C6C6F,
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+ + " 0x74696E79626C6F62,
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321,
1234567, 7654321,\n"
+ + " 123456789, 987654321, 123, 789, 12.34,
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+ + " 'This is a text field', 'This is a tiny
text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n"
+ + " '2023-04-27 11:08:40', 1,
b'0101010101010101010101010101010101010101010101010101010101010101', 'C',
'enum2',\n"
+ + "
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A,
'This is a long varchar field',\n"
+ + " 12.345, '14:30:00', -128, 255, '{ \"key\":
\"value\" }', 1999 )");
+ insertRecord(new Record(5, 123456789L, "{ \"key\": \"value\" }"));
+
+ executeSql("DELETE FROM " + database + "." + tableName + " where id =
2");
+ deleteRecord(2);
+
+ executeSql(
+ "UPDATE "
+ + database
+ + "."
+ + tableName
+ + " SET f_bigint = 10000, f_json = '{ \"key\":
\"value1\" }' where id = 3");
+ insertRecord(new Record(3, 10000L, "{ \"key\": \"value1\" }"));
+ }
+
+ @Data
+ @AllArgsConstructor
+ static class Record {
+ private Integer id;
+ private Long bigInt;
+ private String json;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSparkS3MultiTableIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSparkS3MultiTableIT.java
index db43348aef..f91f340f3c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSparkS3MultiTableIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSparkS3MultiTableIT.java
@@ -115,8 +115,8 @@ public class HudiSparkS3MultiTableIT extends TestSuiteBase
implements TestResour
disabledReason =
"The hadoop version in current flink image is not
compatible with the aws version and default container of seatunnel not support
s3.")
public void testS3MultiWrite(TestContainer container) throws IOException,
InterruptedException {
- container.copyFileToContainer("/core-site.xml",
"/tmp/seatunnel/config/core-site.xml");
- Container.ExecResult textWriteResult =
container.executeJob("/s3_fake_to_hudi.conf");
+ container.copyFileToContainer("/hudi/core-site.xml",
"/tmp/seatunnel/config/core-site.xml");
+ Container.ExecResult textWriteResult =
container.executeJob("/hudi/s3_fake_to_hudi.conf");
Assertions.assertEquals(0, textWriteResult.getExitCode());
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", LocalFileSystem.DEFAULT_FS);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/ddl/mysql_cdc.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/ddl/mysql_cdc.sql
new file mode 100644
index 0000000000..5f42375263
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/ddl/mysql_cdc.sql
@@ -0,0 +1,75 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--
----------------------------------------------------------------------------------------------------------------
+-- DATABASE: inventory
+--
----------------------------------------------------------------------------------------------------------------
+CREATE DATABASE IF NOT EXISTS `mysql_cdc`;
+
+use mysql_cdc;
+-- Create a mysql data source table
+CREATE TABLE mysql_cdc_e2e_source_table
+(
+ `id` int NOT NULL AUTO_INCREMENT,
+ `f_binary` binary(64) DEFAULT NULL,
+ `f_blob` blob,
+ `f_long_varbinary` mediumblob,
+ `f_longblob` longblob,
+ `f_tinyblob` tinyblob,
+ `f_varbinary` varbinary(100) DEFAULT NULL,
+ `f_smallint` smallint DEFAULT NULL,
+ `f_smallint_unsigned` smallint unsigned DEFAULT NULL,
+ `f_mediumint` mediumint DEFAULT NULL,
+ `f_mediumint_unsigned` mediumint unsigned DEFAULT NULL,
+ `f_int` int DEFAULT NULL,
+ `f_int_unsigned` int unsigned DEFAULT NULL,
+ `f_integer` int DEFAULT NULL,
+ `f_integer_unsigned` int unsigned DEFAULT NULL,
+ `f_bigint` bigint DEFAULT NULL,
+ `f_bigint_unsigned` bigint unsigned DEFAULT NULL,
+ `f_numeric` decimal(10, 0) DEFAULT NULL,
+ `f_decimal` decimal(10, 0) DEFAULT NULL,
+ `f_float` float DEFAULT NULL,
+ `f_double` double DEFAULT NULL,
+ `f_double_precision` double DEFAULT NULL,
+ `f_longtext` longtext,
+ `f_mediumtext` mediumtext,
+ `f_text` text,
+ `f_tinytext` tinytext,
+ `f_varchar` varchar(100) DEFAULT NULL,
+ `f_date` date DEFAULT NULL,
+ `f_datetime` datetime DEFAULT NULL,
+ `f_timestamp` timestamp NULL DEFAULT NULL,
+ `f_bit1` bit(1) DEFAULT NULL,
+ `f_bit64` bit(64) DEFAULT NULL,
+ `f_char` char(1) DEFAULT NULL,
+ `f_enum` enum ('enum1','enum2','enum3') DEFAULT NULL,
+ `f_mediumblob` mediumblob,
+ `f_long_varchar` mediumtext,
+ `f_real` double DEFAULT NULL,
+ `f_time` time DEFAULT NULL,
+ `f_tinyint` tinyint DEFAULT NULL,
+ `f_tinyint_unsigned` tinyint unsigned DEFAULT NULL,
+ `f_json` json DEFAULT NULL,
+ `f_year` year DEFAULT NULL,
+ PRIMARY KEY (`id`)
+) ENGINE = InnoDB
+ AUTO_INCREMENT = 2
+ DEFAULT CHARSET = utf8mb4
+ COLLATE = utf8mb4_0900_ai_ci;
+
+truncate table mysql_cdc_e2e_source_table;
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/core-site.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/core-site.xml
similarity index 100%
rename from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/core-site.xml
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/core-site.xml
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/fake_to_hudi.conf
similarity index 100%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/fake_to_hudi.conf
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi_with_omit_config_item.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/fake_to_hudi_with_omit_config_item.conf
similarity index 100%
rename from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi_with_omit_config_item.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/fake_to_hudi_with_omit_config_item.conf
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/multi_fake_to_hudi.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/multi_fake_to_hudi.conf
similarity index 100%
rename from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/multi_fake_to_hudi.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/multi_fake_to_hudi.conf
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/mysql_cdc_to_hudi.conf
similarity index 63%
rename from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/mysql_cdc_to_hudi.conf
index 894f119114..310ee72ba8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/mysql_cdc_to_hudi.conf
@@ -14,36 +14,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
env {
- execution.parallelism = 1
- job.mode = "BATCH"
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
}
source {
- FakeSource {
- schema = {
- fields {
- c_map = "map<string, string>"
- c_array = "array<int>"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(30, 8)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
- }
+ MySQL-CDC {
+ result_table_name="customer_result_table"
+ catalog {
+ factory = Mysql
}
- result_table_name = "fake"
+ database-names=["mysql_cdc"]
+ table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
+ format=DEFAULT
+ username = "st_user"
+ password = "seatunnel"
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
}
}
+transform {
+}
+
sink {
Hudi {
op_type="UPSERT"
@@ -51,6 +49,8 @@ sink {
database = "st"
table_name = "st_test"
table_type="COPY_ON_WRITE"
- record_key_fields="c_bigint"
+ record_key_fields="id"
+ cdc_enabled = true
}
}
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/s3_fake_to_hudi.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/s3_fake_to_hudi.conf
similarity index 100%
rename from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/s3_fake_to_hudi.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/s3_fake_to_hudi.conf
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/mysql/server-gtids/my.cnf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/mysql/server-gtids/my.cnf
new file mode 100644
index 0000000000..a390897885
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/mysql/server-gtids/my.cnf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but
would
+# be longer on a production system. Row-level info is required for ingest to
work.
+# Server ID is required, but this will vary on production systems
+server-id = 223344
+log_bin = mysql-bin
+expire_logs_days = 1
+binlog_format = row
+
+# enable gtid mode
+gtid_mode = on
+enforce_gtid_consistency = on
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/mysql/setup.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/mysql/setup.sql
new file mode 100644
index 0000000000..aa4534e0ad
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/mysql/setup.sql
@@ -0,0 +1,27 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- In production you would almost certainly limit the replication user must be
on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For
example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'st_user' - all privileges required by the snapshot reader AND binlog
reader (used for testing)
+-- 2) 'mysqluser' - all privileges
+--
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT,
DROP, LOCK TABLES ON *.* TO 'st_user'@'%';
+CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
+GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';