This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fcb2938286 Revert "[Feature][Redis] Flush data when the time reaches
checkpoint interval" and "[Feature][CDC] Add 'schema-changes.enabled' options"
(#8278)
fcb2938286 is described below
commit fcb29382862cdbc0985dd23f088cf6ff26cc52fc
Author: Jast <[email protected]>
AuthorDate: Fri Dec 13 10:03:02 2024 +0800
Revert "[Feature][Redis] Flush data when the time reaches checkpoint
interval" and "[Feature][CDC] Add 'schema-changes.enabled' options" (#8278)
---
docs/en/concept/schema-evolution.md | 35 +++++-----
.../formats/cdc-compatible-debezium-json.md | 2 +
docs/en/connector-v2/sink/Paimon.md | 5 +-
docs/en/connector-v2/source/MySQL-CDC.md | 8 +--
docs/en/connector-v2/source/Oracle-CDC.md | 1 -
docs/zh/concept/schema-evolution.md | 36 +++++------
.../formats/cdc-compatible-debezium-json.md | 2 +
docs/zh/connector-v2/sink/Paimon.md | 5 +-
.../cdc/base/config/JdbcSourceConfigFactory.java | 10 +--
.../connectors/cdc/base/option/SourceOptions.java | 7 --
.../cdc/mysql/config/MySqlSourceConfigFactory.java | 6 +-
.../source/MySqlIncrementalSourceFactory.java | 31 +++------
.../oracle/config/OracleSourceConfigFactory.java | 24 ++-----
.../cdc/oracle/source/OracleIncrementalSource.java | 12 +++-
.../source/OracleIncrementalSourceFactory.java | 31 +++------
.../seatunnel/redis/sink/RedisSinkWriter.java | 14 +---
.../mysqlcdc_to_mysql_with_schema_change.conf | 5 +-
...c_to_mysql_with_schema_change_exactly_once.conf | 5 +-
.../oraclecdc_to_mysql_with_schema_change.conf | 5 +-
.../oraclecdc_to_oracle_with_schema_change.conf | 5 +-
..._to_oracle_with_schema_change_exactly_once.conf | 4 +-
.../e2e/connector/hive/HiveKerberosIT.java | 4 ++
.../mysql_cdc_to_iceberg_for_schema_change.conf | 10 ++-
.../mysql_cdc_to_paimon_with_schema_change.conf | 5 +-
.../connector/redis/RedisTestCaseTemplateIT.java | 26 +-------
.../resources/fake-to-redis-test-in-real-time.conf | 75 ----------------------
.../mysqlcdc_to_starrocks_with_schema_change.conf | 5 +-
.../container/seatunnel/SeaTunnelContainer.java | 6 +-
28 files changed, 127 insertions(+), 257 deletions(-)
diff --git a/docs/en/concept/schema-evolution.md
b/docs/en/concept/schema-evolution.md
index 5de26ea94e..b1db057387 100644
--- a/docs/en/concept/schema-evolution.md
+++ b/docs/en/concept/schema-evolution.md
@@ -1,16 +1,7 @@
# Schema evolution
Schema Evolution means that the schema of a data table can be changed and the
data synchronization task can automatically adapt to the changes of the new
table structure without any other operations.
+Now we only support the operation about `add column`、`drop column`、`rename
column` and `modify column` of the table in CDC source. This feature is only
support zeta engine at now.
-## Supported engines
-
-- Zeta
-
-## Supported schema change event types
-
-- `ADD COLUMN`
-- `DROP COLUMN`
-- `RENAME COLUMN`
-- `MODIFY COLUMN`
## Supported connectors
@@ -30,7 +21,7 @@ When you use the Oracle-CDC,you can not use the username
named `SYS` or `SYSTE
Otherwise, If your table name start with `ORA_TEMP_` will also has the same
problem.
## Enable schema evolution
-Schema evolution is disabled by default in CDC source. You need configure
`schema-changes.enabled = true` which is only supported in CDC to enable it.
+Schema evolution is disabled by default in CDC source. You need configure
`debezium.include.schema.changes = true` which is only supported in CDC to
enable it. When you use Oracle-CDC with schema-evolution enabled, you must
specify `redo_log_catalog` as `log.mining.strategy` in the `debezium` attribute.
## Examples
@@ -52,8 +43,9 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-
- schema-changes.enabled = true
+ debezium = {
+ include.schema.changes = true
+ }
}
}
@@ -94,8 +86,10 @@ source {
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
-
- schema-changes.enabled = true
+ debezium {
+ include.schema.changes = true
+ log.mining.strategy = redo_log_catalog
+ }
}
}
@@ -137,8 +131,10 @@ source {
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
-
- schema-changes.enabled = true
+ debezium {
+ include.schema.changes = true
+ log.mining.strategy = redo_log_catalog
+ }
}
}
@@ -173,8 +169,9 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-
- schema-changes.enabled = true
+ debezium = {
+ include.schema.changes = true
+ }
}
}
diff --git a/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
index 59f9981d71..564eb2356c 100644
--- a/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
+++ b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
@@ -33,6 +33,8 @@ source {
# include schema into kafka message
key.converter.schemas.enable = false
value.converter.schemas.enable = false
+ # include ddl
+ include.schema.changes = true
# topic prefix
database.server.name = "mysql_cdc_1"
}
diff --git a/docs/en/connector-v2/sink/Paimon.md
b/docs/en/connector-v2/sink/Paimon.md
index f2a68ae3b8..2959855120 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -107,8 +107,9 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-
- schema-changes.enabled = true
+ debezium = {
+ include.schema.changes = true
+ }
}
}
diff --git a/docs/en/connector-v2/source/MySQL-CDC.md
b/docs/en/connector-v2/source/MySQL-CDC.md
index 0114d5c1d5..cc58ec4459 100644
--- a/docs/en/connector-v2/source/MySQL-CDC.md
+++ b/docs/en/connector-v2/source/MySQL-CDC.md
@@ -196,8 +196,7 @@ When an initial consistent snapshot is made for large
databases, your establishe
| inverse-sampling.rate | Integer | No | 1000
| The inverse of the sampling rate used in the sample sharding strategy. For
example, if this value is set to 1000, it means a 1/1000 sampling rate is
applied during the sampling process. This option provides flexibility in
controlling the granularity of the sampling, thus affecting the final number of
shards. It's especially useful when dealing with very large datasets where a
lower sampling rate is preferr [...]
| exactly_once | Boolean | No | false
| Enable exactly once semantic.
[...]
| format | Enum | No |
DEFAULT | Optional output format for MySQL CDC, valid enumerations are
`DEFAULT`、`COMPATIBLE_DEBEZIUM_JSON`.
[...]
-| schema-changes.enabled | Boolean | No | false
| Schema evolution is disabled by default. Now we only support `add
column`、`drop column`、`rename column` and `modify column`.
[...]
-| debezium | Config | No | -
| Pass-through [Debezium's
properties](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/mysql.adoc#connector-properties)
to Debezium Embedded Engine which is used to capture data changes from MySQL
server.
[...]
+| debezium | Config | No | -
| Pass-through [Debezium's
properties](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/mysql.adoc#connector-properties)
to Debezium Embedded Engine which is used to capture data changes from MySQL
server. Schema evolution is disabled by default. You need configure
`debezium.include.schema.changes = true` to enable it. Now we only support `add
column`、`drop [...]
| common-options | | no | -
| Source plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details
[...]
## Task Example
@@ -282,8 +281,9 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-
- schema-changes.enabled = true
+ debezium = {
+ include.schema.changes = true
+ }
}
}
diff --git a/docs/en/connector-v2/source/Oracle-CDC.md
b/docs/en/connector-v2/source/Oracle-CDC.md
index 28aeef2a50..8e5c332bef 100644
--- a/docs/en/connector-v2/source/Oracle-CDC.md
+++ b/docs/en/connector-v2/source/Oracle-CDC.md
@@ -249,7 +249,6 @@ exit;
| use_select_count | Boolean | No | false
| Use select count for table count rather then other methods in full stage.In
this scenario, select count directly is used when it is faster to update
statistics using sql from analysis table
[...]
| skip_analyze | Boolean | No | false
| Skip the analysis of table count in full stage.In this scenario, you
schedule analysis table sql to update related table statistics periodically or
your table data does not change frequently
[...]
| format | Enum | No |
DEFAULT | Optional output format for Oracle CDC, valid enumerations are
`DEFAULT`、`COMPATIBLE_DEBEZIUM_JSON`.
[...]
-| schema-changes.enabled | Boolean | No | false
| Schema evolution is disabled by default. Now we only support `add
column`、`drop column`、`rename column` and `modify column`.
[...]
| debezium | Config | No | -
| Pass-through [Debezium's
properties](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/oracle.adoc#connector-properties)
to Debezium Embedded Engine which is used to capture data changes from Oracle
server.
[...]
| common-options | | no | -
| Source plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details
[...]
diff --git a/docs/zh/concept/schema-evolution.md
b/docs/zh/concept/schema-evolution.md
index 200259f518..f8770abed5 100644
--- a/docs/zh/concept/schema-evolution.md
+++ b/docs/zh/concept/schema-evolution.md
@@ -1,16 +1,6 @@
# 模式演进
模式演进是指数据表的Schema可以改变,数据同步任务可以自动适应新的表结构的变化而无需其他操作。
-
-## 已支持的引擎
-
-- Zeta
-
-## 已支持的模式变更事件类型
-
-- `ADD COLUMN`
-- `DROP COLUMN`
-- `RENAME COLUMN`
-- `MODIFY COLUMN`
+现在我们只支持对CDC源中的表进行“添加列”、“删除列”、“重命名列”和“修改列”的操作。目前这个功能只支持zeta引擎。
## 已支持的连接器
@@ -30,7 +20,7 @@
另外,如果你的表名以`ORA_TEMP_`开头,也会有相同的问题。
## 启用Schema evolution功能
-在CDC源连接器中模式演进默认是关闭的。你需要在CDC连接器中配置`schema-changes.enabled = true`来启用它。
+在CDC源连接器中模式演进默认是关闭的。你需要在CDC连接器中配置`debezium.include.schema.changes =
true`来启用它。当你使用Oracle-CDC并且启用schema-evolution时,你必须将`debezium`属性中的`log.mining.strategy`指定为`redo_log_catalog`。
## 示例
@@ -52,8 +42,9 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-
- schema-changes.enabled = true
+ debezium = {
+ include.schema.changes = true
+ }
}
}
@@ -94,8 +85,10 @@ source {
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
-
- schema-changes.enabled = true
+ debezium {
+ include.schema.changes = true
+ log.mining.strategy = redo_log_catalog
+ }
}
}
@@ -137,8 +130,10 @@ source {
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
-
- schema-changes.enabled = true
+ debezium {
+ include.schema.changes = true
+ log.mining.strategy = redo_log_catalog
+ }
}
}
@@ -173,8 +168,9 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-
- schema-changes.enabled = true
+ debezium = {
+ include.schema.changes = true
+ }
}
}
diff --git a/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md
b/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md
index 6c5b57b278..8febab18fb 100644
--- a/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md
+++ b/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md
@@ -33,6 +33,8 @@ source {
# include schema into kafka message
key.converter.schemas.enable = false
value.converter.schemas.enable = false
+ # include ddl
+ include.schema.changes = true
# topic prefix
database.server.name = "mysql_cdc_1"
}
diff --git a/docs/zh/connector-v2/sink/Paimon.md
b/docs/zh/connector-v2/sink/Paimon.md
index 1faa5dc9b0..4d83dcb6c7 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -105,8 +105,9 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-
- schema-changes.enabled = true
+ debezium = {
+ include.schema.changes = true
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
index 87dd7d3a8f..99ddb3bd17 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
@@ -44,6 +44,7 @@ public abstract class JdbcSourceConfigFactory implements
SourceConfig.Factory<Jd
protected List<String> tableList;
protected StartupConfig startupConfig;
protected StopConfig stopConfig;
+ protected boolean includeSchemaChanges = false;
protected double distributionFactorUpper =
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue();
protected double distributionFactorLower =
@@ -59,10 +60,6 @@ public abstract class JdbcSourceConfigFactory implements
SourceConfig.Factory<Jd
protected int connectMaxRetries =
JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue();
protected int connectionPoolSize =
JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue();
@Setter protected boolean exactlyOnce =
JdbcSourceOptions.EXACTLY_ONCE.defaultValue();
-
- @Setter
- protected boolean schemaChangeEnabled =
JdbcSourceOptions.SCHEMA_CHANGES_ENABLED.defaultValue();
-
protected Properties dbzProperties;
/** String hostname of the database server. */
@@ -213,8 +210,8 @@ public abstract class JdbcSourceConfigFactory implements
SourceConfig.Factory<Jd
}
/** Whether the {@link SourceConfig} should output the schema changes or
not. */
- public JdbcSourceConfigFactory schemaChangeEnabled(boolean
schemaChangeEnabled) {
- this.schemaChangeEnabled = schemaChangeEnabled;
+ public JdbcSourceConfigFactory includeSchemaChanges(boolean
includeSchemaChanges) {
+ this.includeSchemaChanges = includeSchemaChanges;
return this;
}
@@ -267,7 +264,6 @@ public abstract class JdbcSourceConfigFactory implements
SourceConfig.Factory<Jd
this.connectMaxRetries =
config.get(JdbcSourceOptions.CONNECT_MAX_RETRIES);
this.connectionPoolSize =
config.get(JdbcSourceOptions.CONNECTION_POOL_SIZE);
this.exactlyOnce = config.get(JdbcSourceOptions.EXACTLY_ONCE);
- this.schemaChangeEnabled =
config.get(JdbcSourceOptions.SCHEMA_CHANGES_ENABLED);
this.dbzProperties = new Properties();
config.getOptional(SourceOptions.DEBEZIUM_PROPERTIES)
.ifPresent(map -> dbzProperties.putAll(map));
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
index 7fcd4d3448..6c83088ef2 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
@@ -107,13 +107,6 @@ public class SourceOptions {
.defaultValue(false)
.withDescription("Enable exactly once semantic.");
- public static final Option<Boolean> SCHEMA_CHANGES_ENABLED =
- Options.key("schema-changes.enabled")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "Enable send schema change events, by default is
false. If set to true, the schema changes will be sent to downstream.");
-
public static OptionRule.Builder getBaseRule() {
return OptionRule.builder()
.optional(FORMAT)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
index db63e4e4dc..fd5d7deadf 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
@@ -31,6 +31,7 @@ import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.ch
/** A factory to initialize {@link MySqlSourceConfig}. */
public class MySqlSourceConfigFactory extends JdbcSourceConfigFactory {
public static final String SCHEMA_CHANGE_KEY = "include.schema.changes";
+ public static final Boolean SCHEMA_CHANGE_DEFAULT = true;
private ServerIdRange serverIdRange;
@@ -77,8 +78,9 @@ public class MySqlSourceConfigFactory extends
JdbcSourceConfigFactory {
// Note: the includeSchemaChanges parameter is used to control
emitting the schema record,
// only DataStream API program need to emit the schema record, the
Table API need not
- // setting debezium capture mysql ddl
- props.setProperty(SCHEMA_CHANGE_KEY,
String.valueOf(schemaChangeEnabled));
+ // Some scenarios do not require automatic capture of table structure
changes, so the
+ // default setting is true.
+ props.setProperty(SCHEMA_CHANGE_KEY, SCHEMA_CHANGE_DEFAULT.toString());
// disable the offset flush totally
props.setProperty("offset.flush.interval.ms",
String.valueOf(Long.MAX_VALUE));
// disable tombstones
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index c11f9e72d4..8de399b587 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -69,8 +69,7 @@ public class MySqlIncrementalSourceFactory extends
BaseChangeStreamTableSourceFa
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
JdbcSourceOptions.INVERSE_SAMPLING_RATE,
- JdbcSourceOptions.TABLE_NAMES_CONFIG,
- JdbcSourceOptions.SCHEMA_CHANGES_ENABLED)
+ JdbcSourceOptions.TABLE_NAMES_CONFIG)
.optional(MySqlSourceOptions.STARTUP_MODE,
MySqlSourceOptions.STOP_MODE)
.conditional(
MySqlSourceOptions.STARTUP_MODE,
@@ -104,25 +103,15 @@ public class MySqlIncrementalSourceFactory extends
BaseChangeStreamTableSourceFa
context.getOptions(), context.getClassLoader());
boolean enableSchemaChange =
context.getOptions()
- .getOptional(SourceOptions.SCHEMA_CHANGES_ENABLED)
- .orElse(
- // TODO remove this after all users used
the new schema change
- // option
- context.getOptions()
-
.getOptional(SourceOptions.DEBEZIUM_PROPERTIES)
- .map(
- e ->
- e.getOrDefault(
-
MySqlSourceConfigFactory
-
.SCHEMA_CHANGE_KEY,
-
SourceOptions
-
.SCHEMA_CHANGES_ENABLED
-
.defaultValue()
-
.toString()))
- .map(Boolean::parseBoolean)
- .orElse(
-
SourceOptions.SCHEMA_CHANGES_ENABLED
- .defaultValue()));
+ .getOptional(SourceOptions.DEBEZIUM_PROPERTIES)
+ .map(
+ e ->
+ e.getOrDefault(
+
MySqlSourceConfigFactory.SCHEMA_CHANGE_KEY,
+
MySqlSourceConfigFactory.SCHEMA_CHANGE_DEFAULT
+ .toString()))
+ .map(Boolean::parseBoolean)
+
.orElse(MySqlSourceConfigFactory.SCHEMA_CHANGE_DEFAULT);
if (!restoreTables.isEmpty() && enableSchemaChange) {
catalogTables = mergeTableStruct(catalogTables, restoreTables);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
index 3786fb937c..b08d4e4dad 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
@@ -37,8 +37,7 @@ public class OracleSourceConfigFactory extends
JdbcSourceConfigFactory {
private static final String DRIVER_CLASS_NAME =
"oracle.jdbc.driver.OracleDriver";
public static final String SCHEMA_CHANGE_KEY = "include.schema.changes";
- public static final String LOG_MINING_STRATEGY_KEY = "log.mining.strategy";
- public static final String LOG_MINING_STRATEGY_DEFAULT = "online_catalog";
+ public static final Boolean SCHEMA_CHANGE_DEFAULT = true;
private List<String> schemaList;
@@ -95,16 +94,17 @@ public class OracleSourceConfigFactory extends
JdbcSourceConfigFactory {
props.setProperty("database.history.skip.unparseable.ddl",
String.valueOf(true));
props.setProperty("database.history.refer.ddl", String.valueOf(true));
- // setting debezium capture oracle ddl
- props.setProperty(SCHEMA_CHANGE_KEY,
String.valueOf(schemaChangeEnabled));
- props.setProperty(
- LOG_MINING_STRATEGY_KEY,
- schemaChangeEnabled ? "redo_log_catalog" :
LOG_MINING_STRATEGY_DEFAULT);
+ // Some scenarios do not require automatic capture of table structure
changes, so the
+ // default setting is true.
+ props.setProperty(SCHEMA_CHANGE_KEY, SCHEMA_CHANGE_DEFAULT.toString());
props.setProperty("connect.timeout.ms",
String.valueOf(connectTimeoutMillis));
// disable tombstones
props.setProperty("tombstones.on.delete", String.valueOf(false));
+ // Optimize logminer latency
+ props.setProperty("log.mining.strategy", "online_catalog");
+
if (originUrl != null) {
props.setProperty("database.url", originUrl);
} else {
@@ -139,16 +139,6 @@ public class OracleSourceConfigFactory extends
JdbcSourceConfigFactory {
// override the user-defined debezium properties
if (dbzProperties != null) {
- String debeziumSchemaChanges =
- dbzProperties.getProperty(
- SCHEMA_CHANGE_KEY,
String.valueOf(schemaChangeEnabled));
- String debeziumLogMiningStrategy =
- dbzProperties.getProperty(LOG_MINING_STRATEGY_KEY,
LOG_MINING_STRATEGY_DEFAULT);
- if (Boolean.parseBoolean(debeziumSchemaChanges)
- &&
LOG_MINING_STRATEGY_DEFAULT.equals(debeziumLogMiningStrategy)) {
- throw new IllegalArgumentException(
- "Debezium log mining strategy must be set to
redo_log_catalog when schema changes are enabled");
- }
props.putAll(dbzProperties);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
index eb602aa418..80b4a0b3c0 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
@@ -105,12 +105,22 @@ public class OracleIncrementalSource<T> extends
IncrementalSource<T, JdbcSourceC
}
String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
+
+ boolean enableDDL =
+ Boolean.parseBoolean(
+ debeziumProperties.getOrDefault(
+ OracleSourceConfigFactory.SCHEMA_CHANGE_KEY,
+
OracleSourceConfigFactory.SCHEMA_CHANGE_DEFAULT.toString()));
+
return (DebeziumDeserializationSchema<T>)
SeaTunnelRowDebeziumDeserializeSchema.builder()
.setTables(catalogTables)
.setServerTimeZone(ZoneId.of(zoneId))
.setSchemaChangeResolver(
- new
OracleSchemaChangeResolver(createSourceConfigFactory(config)))
+ enableDDL
+ ? new OracleSchemaChangeResolver(
+
createSourceConfigFactory(config))
+ : null)
.setTableIdTableChangeMap(tableIdStructMap)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
index 01690bc3dc..d790107cf1 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
@@ -69,8 +69,7 @@ public class OracleIncrementalSourceFactory extends
BaseChangeStreamTableSourceF
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
- JdbcSourceOptions.TABLE_NAMES_CONFIG,
- JdbcSourceOptions.SCHEMA_CHANGES_ENABLED)
+ JdbcSourceOptions.TABLE_NAMES_CONFIG)
.optional(OracleSourceOptions.STARTUP_MODE,
OracleSourceOptions.STOP_MODE)
.conditional(
OracleSourceOptions.STARTUP_MODE,
@@ -110,25 +109,15 @@ public class OracleIncrementalSourceFactory extends
BaseChangeStreamTableSourceF
context.getOptions(), context.getClassLoader());
boolean enableSchemaChange =
context.getOptions()
- .getOptional(SourceOptions.SCHEMA_CHANGES_ENABLED)
- .orElse(
- // TODO remove this after all users used
the new schema change
- // option
- context.getOptions()
-
.getOptional(SourceOptions.DEBEZIUM_PROPERTIES)
- .map(
- e ->
- e.getOrDefault(
-
OracleSourceConfigFactory
-
.SCHEMA_CHANGE_KEY,
-
SourceOptions
-
.SCHEMA_CHANGES_ENABLED
-
.defaultValue()
-
.toString()))
- .map(Boolean::parseBoolean)
- .orElse(
-
SourceOptions.SCHEMA_CHANGES_ENABLED
- .defaultValue()));
+ .getOptional(SourceOptions.DEBEZIUM_PROPERTIES)
+ .map(
+ e ->
+ e.getOrDefault(
+
OracleSourceConfigFactory.SCHEMA_CHANGE_KEY,
+
OracleSourceConfigFactory.SCHEMA_CHANGE_DEFAULT
+ .toString()))
+ .map(Boolean::parseBoolean)
+
.orElse(OracleSourceConfigFactory.SCHEMA_CHANGE_DEFAULT);
if (!restoreTables.isEmpty() && enableSchemaChange) {
catalogTables = mergeTableStruct(catalogTables, restoreTables);
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
index b634462fbf..71739b5789 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
@@ -39,7 +39,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
public class RedisSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {
@@ -79,7 +78,8 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
String value = getValue(element, fields);
valueBuffer.add(value);
if (keyBuffer.size() >= batchSize) {
- flush();
+ doBatchWrite();
+ clearBuffer();
}
}
@@ -221,16 +221,6 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
@Override
public void close() throws IOException {
- flush();
- }
-
- @Override
- public Optional<Void> prepareCommit() {
- flush();
- return Optional.empty();
- }
-
- private synchronized void flush() {
if (!keyBuffer.isEmpty()) {
doBatchWrite();
clearBuffer();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf
index 7e93474d5e..632b643bb2 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf
@@ -34,8 +34,9 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-
- schema-changes.enabled = true
+ debezium = {
+ include.schema.changes = true
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change_exactly_once.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change_exactly_once.conf
index 275ecf4464..8aa06c85bd 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change_exactly_once.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change_exactly_once.conf
@@ -34,8 +34,9 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-
- schema-changes.enabled = true
+ debezium = {
+ include.schema.changes = true
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_mysql_with_schema_change.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_mysql_with_schema_change.conf
index 70c9aedb4f..58acb86f83 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_mysql_with_schema_change.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_mysql_with_schema_change.conf
@@ -37,11 +37,10 @@ source {
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
-
- schema-changes.enabled = true
debezium {
database.oracle.jdbc.timezoneAsRegion = false
-
+ include.schema.changes = true
+ log.mining.strategy = redo_log_catalog
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change.conf
index 76903a6e00..80fcc8c796 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change.conf
@@ -37,11 +37,10 @@ source {
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
-
- schema-changes.enabled = true
debezium {
database.oracle.jdbc.timezoneAsRegion = false
-
+ include.schema.changes = true
+ log.mining.strategy = redo_log_catalog
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change_exactly_once.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change_exactly_once.conf
index 9554a4fd49..949e62ef71 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change_exactly_once.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change_exactly_once.conf
@@ -37,11 +37,9 @@ source {
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
-
- schema-changes.enabled = true
debezium {
database.oracle.jdbc.timezoneAsRegion = false
-
+ include.schema.changes = true
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveKerberosIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveKerberosIT.java
index 6d85903d89..dd666dd710 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveKerberosIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveKerberosIT.java
@@ -257,6 +257,10 @@ public class HiveKerberosIT extends SeaTunnelContainer {
log.info(hiveServerContainer.execInContainer("cat",
"/tmp/hive/hive.log").getStdout());
hiveServerContainer.close();
}
+ if (kerberosContainer != null) {
+ kerberosContainer.close();
+ }
+ super.tearDown();
}
private void initializeConnection()
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf
index c8353c9e4c..68102192a0 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf
@@ -27,9 +27,13 @@ env {
source {
MySQL-CDC {
plugin_output="customer_result_table"
-
- schema-changes.enabled = true
-
+ catalog {
+ factory = Mysql
+ }
+ debezium = {
+ # include ddl
+ "include.schema.changes" = true
+ }
database-names=["mysql_cdc"]
table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
format=DEFAULT
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
index a214430dd0..714c4be81c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
@@ -34,8 +34,9 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-
- schema-changes.enabled = true
+ debezium = {
+ include.schema.changes = true
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index d21caa60f2..96ac20cbe6 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -57,12 +57,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
-import static org.awaitility.Awaitility.await;
-
@Slf4j
public abstract class RedisTestCaseTemplateIT extends TestSuiteBase implements
TestResource {
@@ -496,7 +492,7 @@ public abstract class RedisTestCaseTemplateIT extends
TestSuiteBase implements T
}
@TestTemplate
- public void testFakeToToRedisDeleteZSetTest(TestContainer container)
+ public void testMysqlCdcToRedisDeleteZSetTest(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/fake-to-redis-test-delete-zset.conf");
@@ -505,26 +501,6 @@ public abstract class RedisTestCaseTemplateIT extends
TestSuiteBase implements T
jedis.del("zset_check");
}
- @TestTemplate
- public void testFakeToRedisInRealTimeTest(TestContainer container) {
- CompletableFuture.supplyAsync(
- () -> {
- try {
-
container.executeJob("/fake-to-redis-test-in-real-time.conf");
- } catch (Exception e) {
- log.error("Commit task exception :" + e.getMessage());
- throw new RuntimeException(e);
- }
- return null;
- });
- await().atMost(60000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () -> {
- Assertions.assertEquals(3,
jedis.llen("list_check"));
- });
- jedis.del("list_check");
- }
-
@TestTemplate
public void testFakeToRedisNormalKeyIsNullTest(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-in-real-time.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-in-real-time.conf
deleted file mode 100644
index 923d53b5a0..0000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-in-real-time.conf
+++ /dev/null
@@ -1,75 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
- parallelism = 1
- job.mode = "STREAMING"
- checkpoint.interval = 10000
- shade.identifier = "base64"
-
- #spark config
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
-}
-
-source {
- FakeSource {
- schema = {
- fields {
- id = int
- val_bool = boolean
- val_int8 = tinyint
- val_int16 = smallint
- val_int32 = int
- val_int64 = bigint
- val_float = float
- val_double = double
- val_decimal = "decimal(16, 1)"
- val_string = string
- val_unixtime_micros = timestamp
- }
- }
- rows = [
- {
- kind = INSERT
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- }
- ]
- }
-}
-
-sink {
- Redis {
- host = "redis-e2e"
- port = 6379
- auth = "U2VhVHVubmVs"
- key = "list_check"
- data_type = list
- batch_size = 33
- }
-}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf
index 76d86a4e8c..ba3c03db1e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf
@@ -32,8 +32,9 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-
- schema-changes.enabled = true
+ debezium = {
+ include.schema.changes = true
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index ad586153f4..1b42994154 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -512,7 +512,11 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
@Override
public String getJobStatus(String jobId) {
- HttpGet get = new HttpGet("http://" + server.getHost() +
":8080/job-info/" + jobId);
+ HttpGet get =
+ new HttpGet(
+ String.format(
+ "http://%s:%d/job-info/%s",
+ server.getHost(), server.getMappedPort(8080),
jobId));
try (CloseableHttpClient client = HttpClients.createDefault()) {
CloseableHttpResponse response = client.execute(get);
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {