This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3561d3878 [Connector-V2][JDBC] Support database: greenplum (#2429)
3561d3878 is described below
commit 3561d3878f229555802abeaf4e7ed9ba8a49ba43
Author: hailin0 <[email protected]>
AuthorDate: Wed Aug 24 09:57:28 2022 +0800
[Connector-V2][JDBC] Support database: greenplum (#2429)
* [Connector-V2][JDBC] Support database: Greenplum
Support connect greenplum drivers:
* postgresql driver: org.postgresql.Driver
* greenplum driver: com.pivotal.jdbc.GreenplumDriver
Co-authored-by: wanghailin <[email protected]>
---
docs/en/connector-v2/sink/Greenplum.md | 27 ++++
docs/en/connector-v2/source/Greenplum.md | 17 +++
.../dialect/greenplum/GreenplumDialectFactory.java | 40 ++++++
.../e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java | 2 +-
.../e2e/flink/v2/jdbc/JdbcGreenplumIT.java | 159 +++++++++++++++++++++
.../jdbc/jdbc_greenplum_source_and_sink.conf | 60 ++++++++
.../e2e/spark/v2/jdbc/JdbcGreenplumIT.java | 159 +++++++++++++++++++++
.../jdbc/jdbc_greenplum_source_and_sink.conf | 62 ++++++++
8 files changed, 525 insertions(+), 1 deletion(-)
diff --git a/docs/en/connector-v2/sink/Greenplum.md
b/docs/en/connector-v2/sink/Greenplum.md
new file mode 100644
index 000000000..9317e5c62
--- /dev/null
+++ b/docs/en/connector-v2/sink/Greenplum.md
@@ -0,0 +1,27 @@
+# Greenplum
+
+> Greenplum sink connector
+
+## Description
+
+Write data to Greenplum using [Jdbc connector](Jdbc.md).
+
+:::tip
+
+Not support exactly-once semantics (XA transaction is not yet supported in
Greenplum database).
+
+:::
+
+## Options
+
+### driver [string]
+
+Optional jdbc drivers:
+- `org.postgresql.Driver`
+- `com.pivotal.jdbc.GreenplumDriver`
+
+Warn: for license compliance, if you use `GreenplumDriver` the have to provide
Greenplum JDBC driver yourself, e.g. copy greenplum-xxx.jar to
$SEATNUNNEL_HOME/lib for Standalone.
+
+### url [string]
+
+The URL of the JDBC connection. if you use postgresql driver the value is
`jdbc:postgresql://${yous_host}:${yous_port}/${yous_database}`, or you use
greenplum driver the value is
`jdbc:pivotal:greenplum://${yous_host}:${yous_port};DatabaseName=${yous_database}`
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/Greenplum.md
b/docs/en/connector-v2/source/Greenplum.md
new file mode 100644
index 000000000..cd140549b
--- /dev/null
+++ b/docs/en/connector-v2/source/Greenplum.md
@@ -0,0 +1,17 @@
+# Greenplum
+
+> Greenplum source connector
+
+## Description
+
+Read Greenplum data through [Jdbc connector](Jdbc.md).
+
+:::tip
+
+Optional jdbc drivers:
+- `org.postgresql.Driver`
+- `com.pivotal.jdbc.GreenplumDriver`
+
+Warn: for license compliance, if you use `GreenplumDriver` the have to provide
Greenplum JDBC driver yourself, e.g. copy greenplum-xxx.jar to
$SEATNUNNEL_HOME/lib for Standalone.
+
+:::
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/greenplum/GreenplumDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/greenplum/GreenplumDialectFactory.java
new file mode 100644
index 000000000..fb4ca3865
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/greenplum/GreenplumDialectFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.greenplum;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialect;
+
+import com.google.auto.service.AutoService;
+import lombok.NonNull;
+
+@AutoService(JdbcDialectFactory.class)
+public class GreenplumDialectFactory implements JdbcDialectFactory {
+
+ @Override
+ public boolean acceptsURL(@NonNull String url) {
+ // Support greenplum native driver: com.pivotal.jdbc.GreenplumDriver
+ return url.startsWith("jdbc:pivotal:greenplum:");
+ }
+
+ @Override
+ public JdbcDialect create() {
+ return new PostgresDialect();
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
index 784bd9a06..9bc241bf4 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
@@ -89,7 +89,7 @@ public class FakeSourceToJdbcIT extends FlinkContainer {
}
@AfterEach
- public void closeClickHouseContainer() {
+ public void closePostgreSqlContainer() {
if (psl != null) {
psl.stop();
}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java
new file mode 100644
index 000000000..715441032
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcGreenplumIT extends FlinkContainer {
+
+ private static final String GREENPLUM_IMAGE = "datagrip/greenplum:6.8";
+ private static final String GREENPLUM_CONTAINER_HOST =
"flink_e2e_greenplum";
+ private static final int GREENPLUM_CONTAINER_PORT = 5432;
+ private static final String GREENPLUM_HOST = "localhost";
+ private static final int GREENPLUM_PORT = 5435;
+ private static final String GREENPLUM_USER = "tester";
+ private static final String GREENPLUM_PASSWORD = "pivotal";
+ private static final String GREENPLUM_DRIVER = "org.postgresql.Driver";
+ private static final String GREENPLUM_JDBC_URL = String.format(
+ "jdbc:postgresql://%s:%s/testdb", GREENPLUM_HOST, GREENPLUM_PORT);
+ private static final List<List> TEST_DATASET = generateTestDataset();
+
+ private GenericContainer<?> greenplumServer;
+ private Connection jdbcConnection;
+
+ @BeforeEach
+ public void startGreenplumContainer() throws ClassNotFoundException,
SQLException {
+ greenplumServer = new GenericContainer<>(GREENPLUM_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(GREENPLUM_CONTAINER_HOST)
+ .withLogConsumer(new Slf4jLogConsumer(log));
+ greenplumServer.setPortBindings(Lists.newArrayList(
+ String.format("%s:%s", GREENPLUM_PORT,
GREENPLUM_CONTAINER_PORT)));
+ Startables.deepStart(Stream.of(greenplumServer)).join();
+ log.info("Greenplum container started");
+ // wait for Greenplum fully start
+ Class.forName(GREENPLUM_DRIVER);
+ given().ignoreExceptions()
+ .await()
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(() -> initializeJdbcConnection());
+ initializeJdbcTable();
+ batchInsertData();
+ }
+
+ @Test
+ public void testJdbcGreenplumSourceAndSink() throws IOException,
InterruptedException, SQLException {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/jdbc/jdbc_greenplum_source_and_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ // query result
+ String sql = "select age, name from sink order by age asc";
+ List<List> result = new ArrayList<>();
+ try (Statement statement = jdbcConnection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sql);
+ while (resultSet.next()) {
+ result.add(Arrays.asList(
+ resultSet.getInt(1),
+ resultSet.getString(2)));
+ }
+ }
+ Assertions.assertIterableEquals(TEST_DATASET, result);
+ }
+
+ private void initializeJdbcConnection() throws SQLException {
+ jdbcConnection = DriverManager.getConnection(GREENPLUM_JDBC_URL,
+ GREENPLUM_USER, GREENPLUM_PASSWORD);
+ }
+
+ private void initializeJdbcTable() throws SQLException {
+ try (Statement statement = jdbcConnection.createStatement()) {
+ String createSource = "CREATE TABLE source (\n" +
+ "age INT NOT NULL,\n" +
+ "name VARCHAR(255) NOT NULL\n" +
+ ")";
+ String createSink = "CREATE TABLE sink (\n" +
+ "age INT NOT NULL,\n" +
+ "name VARCHAR(255) NOT NULL\n" +
+ ")";
+ statement.execute(createSource);
+ statement.execute(createSink);
+ }
+ }
+
+ private static List<List> generateTestDataset() {
+ List<List> rows = new ArrayList<>();
+ for (int i = 1; i <= 100; i++) {
+ rows.add(Arrays.asList(i, String.format("test_%s", i)));
+ }
+ return rows;
+ }
+
+ private void batchInsertData() throws SQLException {
+ String sql = "insert into source(age, name) values(?, ?)";
+
+ try {
+ jdbcConnection.setAutoCommit(false);
+ try (PreparedStatement preparedStatement =
jdbcConnection.prepareStatement(sql)) {
+ for (List row : TEST_DATASET) {
+ preparedStatement.setInt(1, (Integer) row.get(0));
+ preparedStatement.setString(2, (String) row.get(1));
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ }
+ jdbcConnection.commit();
+ } catch (SQLException e) {
+ jdbcConnection.rollback();
+ throw e;
+ }
+ }
+
+ @AfterEach
+ public void closeGreenplumContainer() throws SQLException {
+ if (jdbcConnection != null) {
+ jdbcConnection.close();
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf
new file mode 100644
index 000000000..e6ee34bf3
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ Jdbc {
+ driver = org.postgresql.Driver
+ url =
"jdbc:postgresql://flink_e2e_greenplum:5432/testdb?loggerLevel=OFF"
+ user = tester
+ password = pivotal
+ query = "select age, name from source"
+ }
+
+ # If you would like to get more information about how to configure
seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
+}
+
+transform {
+
+ # If you would like to get more information about how to configure
seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ Jdbc {
+ driver = org.postgresql.Driver
+ url =
"jdbc:postgresql://flink_e2e_greenplum:5432/testdb?loggerLevel=OFF"
+ user = tester
+ password = pivotal
+ query = "insert into sink(age, name) values(?, ?)"
+ }
+
+ # If you would like to get more information about how to configure
seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java
new file mode 100644
index 000000000..7910a0dde
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcGreenplumIT extends SparkContainer {
+
+ private static final String GREENPLUM_IMAGE = "datagrip/greenplum:6.8";
+ private static final String GREENPLUM_CONTAINER_HOST =
"spark_e2e_greenplum";
+ private static final int GREENPLUM_CONTAINER_PORT = 5432;
+ private static final String GREENPLUM_HOST = "localhost";
+ private static final int GREENPLUM_PORT = 5436;
+ private static final String GREENPLUM_USER = "tester";
+ private static final String GREENPLUM_PASSWORD = "pivotal";
+ private static final String GREENPLUM_DRIVER = "org.postgresql.Driver";
+ private static final String GREENPLUM_JDBC_URL = String.format(
+ "jdbc:postgresql://%s:%s/testdb", GREENPLUM_HOST, GREENPLUM_PORT);
+ private static final List<List> TEST_DATASET = generateTestDataset();
+
+ private GenericContainer<?> greenplumServer;
+ private Connection jdbcConnection;
+
+ @BeforeEach
+ public void startGreenplumContainer() throws ClassNotFoundException,
SQLException {
+ greenplumServer = new GenericContainer<>(GREENPLUM_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(GREENPLUM_CONTAINER_HOST)
+ .withLogConsumer(new Slf4jLogConsumer(log));
+ greenplumServer.setPortBindings(Lists.newArrayList(
+ String.format("%s:%s", GREENPLUM_PORT,
GREENPLUM_CONTAINER_PORT)));
+ Startables.deepStart(Stream.of(greenplumServer)).join();
+ log.info("Greenplum container started");
+ // wait for Greenplum fully start
+ Class.forName(GREENPLUM_DRIVER);
+ given().ignoreExceptions()
+ .await()
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(() -> initializeJdbcConnection());
+ initializeJdbcTable();
+ batchInsertData();
+ }
+
+ @Test
+ public void testJdbcGreenplumSourceAndSink() throws IOException,
InterruptedException, SQLException {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/jdbc/jdbc_greenplum_source_and_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ // query result
+ String sql = "select age, name from sink order by age asc";
+ List<Object> result = new ArrayList<>();
+ try (Statement statement = jdbcConnection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sql);
+ while (resultSet.next()) {
+ result.add(Arrays.asList(
+ resultSet.getInt(1),
+ resultSet.getString(2)));
+ }
+ }
+ Assertions.assertIterableEquals(TEST_DATASET, result);
+ }
+
+ private void initializeJdbcConnection() throws SQLException {
+ jdbcConnection = DriverManager.getConnection(GREENPLUM_JDBC_URL,
+ GREENPLUM_USER, GREENPLUM_PASSWORD);
+ }
+
+ private void initializeJdbcTable() throws SQLException {
+ try (Statement statement = jdbcConnection.createStatement()) {
+ String createSource = "CREATE TABLE source (\n" +
+ "age INT NOT NULL,\n" +
+ "name VARCHAR(255) NOT NULL\n" +
+ ")";
+ String createSink = "CREATE TABLE sink (\n" +
+ "age INT NOT NULL,\n" +
+ "name VARCHAR(255) NOT NULL\n" +
+ ")";
+ statement.execute(createSource);
+ statement.execute(createSink);
+ }
+ }
+
+ private static List<List> generateTestDataset() {
+ List<List> rows = new ArrayList<>();
+ for (int i = 1; i <= 100; i++) {
+ rows.add(Arrays.asList(i, String.format("test_%s", i)));
+ }
+ return rows;
+ }
+
+ private void batchInsertData() throws SQLException {
+ String sql = "insert into source(age, name) values(?, ?)";
+
+ try {
+ jdbcConnection.setAutoCommit(false);
+ try (PreparedStatement preparedStatement =
jdbcConnection.prepareStatement(sql)) {
+ for (List row : TEST_DATASET) {
+ preparedStatement.setInt(1, (Integer) row.get(0));
+ preparedStatement.setString(2, (String) row.get(1));
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ }
+ jdbcConnection.commit();
+ } catch (SQLException e) {
+ jdbcConnection.rollback();
+ throw e;
+ }
+ }
+
+ @AfterEach
+ public void closeGreenplumContainer() throws SQLException {
+ if (jdbcConnection != null) {
+ jdbcConnection.close();
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf
new file mode 100644
index 000000000..6c06feb23
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ Jdbc {
+ driver = org.postgresql.Driver
+ url =
"jdbc:postgresql://spark_e2e_greenplum:5432/testdb?loggerLevel=OFF"
+ user = tester
+ password = pivotal
+ query = "select age, name from source"
+ }
+
+ # If you would like to get more information about how to configure
seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
+}
+
+transform {
+
+ # If you would like to get more information about how to configure
seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ Jdbc {
+ driver = org.postgresql.Driver
+ url =
"jdbc:postgresql://spark_e2e_greenplum:5432/testdb?loggerLevel=OFF"
+ user = tester
+ password = pivotal
+ query = "insert into sink(age, name) values(?, ?)"
+ }
+
+ # If you would like to get more information about how to configure
seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
+}
\ No newline at end of file