This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 819c685651 [Improve][Jdbc] Merge user config primary key when create
table (#7313)
819c685651 is described below
commit 819c6856513c9171b60d63c324e793a6eecbb9f6
Author: hailin0 <[email protected]>
AuthorDate: Wed Aug 7 12:45:54 2024 +0800
[Improve][Jdbc] Merge user config primary key when create table (#7313)
---
.../seatunnel/jdbc/sink/JdbcSinkFactory.java | 20 ++++++++++++++++++++
.../seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java | 7 +++++++
.../test/resources/jdbc_mysql_source_and_sink.conf | 5 ++++-
3 files changed, 31 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index eff6bb67c6..35e9a986ab 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -200,6 +201,25 @@ public class JdbcSinkFactory implements TableSinkFactory {
.collect(Collectors.joining(",")));
}
}
+ } else {
+ // replace primary key to config
+ PrimaryKey configPk =
+ PrimaryKey.of(
+ catalogTable.getTablePath().getTableName() +
"_config_pk",
+ config.get(PRIMARY_KEYS));
+ TableSchema tableSchema = catalogTable.getTableSchema();
+ catalogTable =
+ CatalogTable.of(
+ catalogTable.getTableId(),
+ TableSchema.builder()
+ .primaryKey(configPk)
+
.constraintKey(tableSchema.getConstraintKeys())
+ .columns(tableSchema.getColumns())
+ .build(),
+ catalogTable.getOptions(),
+ catalogTable.getPartitionKeys(),
+ catalogTable.getComment(),
+ catalogTable.getCatalogName());
}
config = ReadonlyConfig.fromMap(new HashMap<>(map));
// always execute
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java
index c8acc95010..bc1361aa26 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java
@@ -47,6 +47,7 @@ import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -74,6 +75,7 @@ public class JdbcMysqlSaveModeHandlerIT extends
AbstractJdbcIT {
private static final String CREATE_SQL =
"CREATE TABLE IF NOT EXISTS %s\n"
+ "(\n"
+ + " `id` bigint(20) NOT
NULL,\n"
+ " `c_bit_1` bit(1)
DEFAULT NULL,\n"
+ " `c_bit_8` bit(8)
DEFAULT NULL,\n"
+ " `c_bit_16` bit(16)
DEFAULT NULL,\n"
@@ -164,6 +166,9 @@ public class JdbcMysqlSaveModeHandlerIT extends
AbstractJdbcIT {
final List<Column> columns = table.getTableSchema().getColumns();
Assertions.assertEquals(columns.size(), columnsSource.size());
+ Assertions.assertIterableEquals(
+ Collections.singletonList("id"),
+ table.getTableSchema().getPrimaryKey().getColumnNames());
}
@Override
@@ -175,6 +180,7 @@ public class JdbcMysqlSaveModeHandlerIT extends
AbstractJdbcIT {
Pair<String[], List<SeaTunnelRow>> initTestData() {
String[] fieldNames =
new String[] {
+ "id",
"c_bit_1",
"c_bit_8",
"c_bit_16",
@@ -229,6 +235,7 @@ public class JdbcMysqlSaveModeHandlerIT extends
AbstractJdbcIT {
SeaTunnelRow row =
new SeaTunnelRow(
new Object[] {
+ (long) i,
i % 2 == 0 ? (byte) 1 : (byte) 0,
new byte[] {byteArr},
new byte[] {byteArr, byteArr},
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_mysql_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_mysql_source_and_sink.conf
index bc379f8ba8..6305f55c46 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_mysql_source_and_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_mysql_source_and_sink.conf
@@ -40,9 +40,12 @@ sink {
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "Abc!@#135_seatunnel"
+
generate_sink_sql = true
- table = "test_laowang"
database = "seatunnel"
+ table = "test_laowang"
+ primary_keys = ["id"]
+
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode="APPEND_DATA"
}