This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ef3e61dbc4 [Improve][[Jdbc]sink sql support custom field.(#6515)
(#6525)
ef3e61dbc4 is described below
commit ef3e61dbc41c3c41f5a93ef48dc282980dc7888c
Author: rtyuy <[email protected]>
AuthorDate: Sat Jun 15 22:49:48 2024 +0800
[Improve][[Jdbc]sink sql support custom field.(#6515) (#6525)
---
.../executor/FieldNamedPreparedStatement.java | 27 ++++-
.../seatunnel/jdbc/JdbcSinkNameParameterSQLIT.java | 122 +++++++++++++++++++++
.../resources/jdbc_sink_name_parameter_sql.conf | 64 +++++++++++
3 files changed, 208 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
index 29c98c7938..c98f50ba92 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
@@ -42,6 +42,7 @@ import java.sql.SQLXML;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
@@ -636,13 +637,29 @@ public class FieldNamedPreparedStatement implements
PreparedStatement {
HashMap<String, List<Integer>> parameterMap = new HashMap<>();
parsedSQL = parseNamedStatement(sql, parameterMap);
// currently, the statements must contain all the field parameters
- checkArgument(parameterMap.size() == fieldNames.length);
+ parameterMap
+ .keySet()
+ .forEach(
+ namedParameter -> {
+ boolean namedParameterExist =
+ Arrays.asList(fieldNames).stream()
+ .anyMatch(field ->
field.equals(namedParameter));
+ checkArgument(
+ namedParameterExist,
+ String.format(
+ "Named parameters [%s] not in
source columns, check SQL: %s",
+ namedParameter, sql));
+ });
+
for (int i = 0; i < fieldNames.length; i++) {
String fieldName = fieldNames[i];
- checkArgument(
- parameterMap.containsKey(fieldName),
- fieldName + " doesn't exist in the parameters of SQL
statement: " + sql);
- indexMapping[i] =
parameterMap.get(fieldName).stream().mapToInt(v -> v).toArray();
+ boolean parameterExist =
+ parameterMap.keySet().stream()
+ .anyMatch(parameter ->
parameter.equals(fieldName));
+ indexMapping[i] =
+ parameterExist
+ ?
parameterMap.get(fieldName).stream().mapToInt(v -> v).toArray()
+ : new int[0];
}
}
log.info("PrepareStatement sql is:\n{}\n", parsedSQL);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkNameParameterSQLIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkNameParameterSQLIT.java
new file mode 100644
index 0000000000..ca0b1b081c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkNameParameterSQLIT.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@Slf4j
+public class JdbcSinkNameParameterSQLIT extends TestSuiteBase implements
TestResource {
+ private static final String PG_IMAGE = "postgres:14-alpine";
+ private static final String PG_DRIVER_JAR =
+
"https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
+ private PostgreSQLContainer<?> postgreSQLContainer;
+
+ @TestContainerExtension
+ private final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib &&
cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+ + PG_DRIVER_JAR);
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+ };
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ postgreSQLContainer =
+ new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
+ .withNetwork(TestSuiteBase.NETWORK)
+ .withNetworkAliases("postgresql")
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
+ Startables.deepStart(Stream.of(postgreSQLContainer)).join();
+ log.info("PostgreSQL container started");
+ Class.forName(postgreSQLContainer.getDriverClassName());
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(this::initializeJdbcTable);
+ }
+
+ @TestTemplate
+ public void testSinkNamedParameterSQL(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/jdbc_sink_name_parameter_sql.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ private void initializeJdbcTable() {
+ try (Connection connection =
+ DriverManager.getConnection(
+ postgreSQLContainer.getJdbcUrl(),
+ postgreSQLContainer.getUsername(),
+ postgreSQLContainer.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sink =
+ "create table sink(\n"
+ + "user_id BIGINT NOT NULL PRIMARY KEY,\n"
+ + "name varchar(255),\n"
+ + "age INT\n"
+ + ")";
+ statement.execute(sink);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing PostgreSql table
failed!", e);
+ }
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() {
+ if (postgreSQLContainer != null) {
+ postgreSQLContainer.stop();
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_sink_name_parameter_sql.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_sink_name_parameter_sql.conf
new file mode 100644
index 0000000000..50c04b07da
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_sink_name_parameter_sql.conf
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 100
+ schema = {
+ fields {
+ user_id = bigint
+ name = string
+ age = int
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = "fake"
+ rules = {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 100
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ]
+ }
+ }
+ Jdbc {
+ source_table_name = "fake"
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+ user = test
+ password = test
+ generate_sink_sql = true
+ database = test
+ query = "insert into public.sink (user_id, name) values(:user_id, :name)"
+
+ }
+}
\ No newline at end of file