This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a06800e90b [Feature][Connector] Add Apache Cloudberry Support (#8985)
a06800e90b is described below
commit a06800e90b5685632ef788fd85aebd5c8fad2412
Author: chenhongyu <[email protected]>
AuthorDate: Mon Mar 24 10:54:09 2025 +0800
[Feature][Connector] Add Apache Cloudberry Support (#8985)
---
.../connector-v2/changelog/connector-cloudberry.md | 7 +
docs/en/connector-v2/sink/Cloudberry.md | 176 ++++++++++++++++++
docs/en/connector-v2/source/Cloudberry.md | 152 +++++++++++++++
.../seatunnel/jdbc/JdbcCloudberryIT.java | 206 +++++++++++++++++++++
.../resources/jdbc_cloudberry_source_and_sink.conf | 57 ++++++
5 files changed, 598 insertions(+)
diff --git a/docs/en/connector-v2/changelog/connector-cloudberry.md
b/docs/en/connector-v2/changelog/connector-cloudberry.md
new file mode 100644
index 0000000000..749ae587cd
--- /dev/null
+++ b/docs/en/connector-v2/changelog/connector-cloudberry.md
@@ -0,0 +1,7 @@
+<details><summary> Change Log </summary>
+
+| Change | Commit | Version |
+| --- | --- | --- |
+|[Feature][Connector] Add Apache Cloudberry Support
(#8985)|https://github.com/apache/seatunnel/commit/b6f82c1|dev|
+
+</details>
diff --git a/docs/en/connector-v2/sink/Cloudberry.md
b/docs/en/connector-v2/sink/Cloudberry.md
new file mode 100644
index 0000000000..c7e5b6c99a
--- /dev/null
+++ b/docs/en/connector-v2/sink/Cloudberry.md
@@ -0,0 +1,176 @@
+import ChangeLog from '../changelog/connector-cloudberry.md';
+
+# Cloudberry
+
+> JDBC Cloudberry Sink Connector
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Description
+
+Write data through JDBC. Cloudberry currently does not have its own native
driver. It uses PostgreSQL's driver for connectivity and follows PostgreSQL's
implementation.
+
+Support Batch mode and Streaming mode, support concurrent writing, support
exactly-once
+semantics (using XA transaction guarantee).
+
+## Using Dependency
+
+### For Spark/Flink Engine
+
+> 1. You need to ensure that the [jdbc driver jar
package](https://mvnrepository.com/artifact/org.postgresql/postgresql) has been
placed in directory `${SEATUNNEL_HOME}/plugins/`.
+
+### For SeaTunnel Zeta Engine
+
+> 1. You need to ensure that the [jdbc driver jar
package](https://mvnrepository.com/artifact/org.postgresql/postgresql) has been
placed in directory `${SEATUNNEL_HOME}/lib/`.
+
+## Key Features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+
+> Use `Xa transactions` to ensure `exactly-once`. So only support
`exactly-once` for the database which is
+> support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
+
+## Supported DataSource Info
+
+| Datasource | Supported Versions | Driver
| Url |
Maven |
+|------------|------------------------------------------|------------------------|---------------------------------------|--------------------------------------------------------------------------|
+| Cloudberry | Uses PostgreSQL driver implementation | org.postgresql.Driver |
jdbc:postgresql://localhost:5432/test |
[Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
+
+## Database Dependency
+
+> Please download the PostgreSQL driver jar and copy it to the
'$SEATUNNEL_HOME/plugins/jdbc/lib/' working directory<br/>
+> For example: cp postgresql-xxx.jar $SEATUNNEL_HOME/plugins/jdbc/lib/
+
+## Data Type Mapping
+
+Cloudberry uses PostgreSQL's data type implementation. Please refer to
PostgreSQL documentation for data type compatibility and mappings.
+
+## Options
+
+Cloudberry connector uses the same options as PostgreSQL. For detailed
configuration options, please refer to the PostgreSQL documentation.
+
+Key options include:
+- url (required): The JDBC connection URL
+- driver (required): The driver class name (org.postgresql.Driver)
+- user/password: Authentication credentials
+- query or database/table combination: What data to write and how
+- is_exactly_once: Enable exactly-once semantics with XA transactions
+- batch_size: Control batch writing behavior
+
+## Task Example
+
+### Simple:
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ parallelism = 1
+ plugin_output = "fake"
+ row.num = 16
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:postgresql://localhost:5432/cloudberrydb"
+ driver = "org.postgresql.Driver"
+ user = "dbadmin"
+ password = "password"
+ query = "insert into test_table(name,age) values(?,?)"
+ }
+}
+```
+
+### Generate Sink SQL
+
+```hocon
+sink {
+ Jdbc {
+ url = "jdbc:postgresql://localhost:5432/cloudberrydb"
+ driver = "org.postgresql.Driver"
+ user = "dbadmin"
+ password = "password"
+
+ generate_sink_sql = true
+ database = "mydb"
+ table = "public.test_table"
+ }
+}
+```
+
+### Exactly-once:
+
+```hocon
+sink {
+ jdbc {
+ url = "jdbc:postgresql://localhost:5432/cloudberrydb"
+ driver = "org.postgresql.Driver"
+ user = "dbadmin"
+ password = "password"
+ query = "insert into test_table(name,age) values(?,?)"
+
+ is_exactly_once = "true"
+ xa_data_source_class_name = "org.postgresql.xa.PGXADataSource"
+ }
+}
+```
+
+### CDC(Change Data Capture) Event
+
+```hocon
+sink {
+ jdbc {
+ url = "jdbc:postgresql://localhost:5432/cloudberrydb"
+ driver = "org.postgresql.Driver"
+ user = "dbadmin"
+ password = "password"
+
+ generate_sink_sql = true
+ database = "mydb"
+ table = "sink_table"
+ primary_keys = ["id","name"]
+ field_ide = UPPERCASE
+ }
+}
+```
+
+### Save mode function
+
+```hocon
+sink {
+ Jdbc {
+ url = "jdbc:postgresql://localhost:5432/cloudberrydb"
+ driver = "org.postgresql.Driver"
+ user = "dbadmin"
+ password = "password"
+
+ generate_sink_sql = true
+ database = "mydb"
+ table = "public.test_table"
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "APPEND_DATA"
+ }
+}
+```
+
+For more detailed examples and options, please refer to the PostgreSQL
connector documentation.
+
+## Changelog
+
+<ChangeLog />
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/Cloudberry.md
b/docs/en/connector-v2/source/Cloudberry.md
new file mode 100644
index 0000000000..80880d6f98
--- /dev/null
+++ b/docs/en/connector-v2/source/Cloudberry.md
@@ -0,0 +1,152 @@
+import ChangeLog from '../changelog/connector-cloudberry.md';
+
+# Cloudberry
+
+> JDBC Cloudberry Source Connector
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Using Dependency
+
+### For Spark/Flink Engine
+
+> 1. You need to ensure that the [jdbc driver jar
package](https://mvnrepository.com/artifact/org.postgresql/postgresql) has been
placed in directory `${SEATUNNEL_HOME}/plugins/`.
+
+### For SeaTunnel Zeta Engine
+
+> 1. You need to ensure that the [jdbc driver jar
package](https://mvnrepository.com/artifact/org.postgresql/postgresql) has been
placed in directory `${SEATUNNEL_HOME}/lib/`.
+
+## Key Features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+> supports query SQL and can achieve projection effect.
+
+## Description
+
+Read external data source data through JDBC. Cloudberry currently does not
have its own native JDBC driver, using PostgreSQL's drivers and implementation.
+
+## Supported DataSource Info
+
+| Datasource | Supported Versions | Driver
| Url |
Maven |
+|------------|------------------------------------------|------------------------|---------------------------------------|--------------------------------------------------------------------------|
+| Cloudberry | Uses PostgreSQL driver implementation | org.postgresql.Driver |
jdbc:postgresql://localhost:5432/test |
[Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
+
+## Database Dependency
+
+> Please download the PostgreSQL driver jar and copy it to the
'$SEATUNNEL_HOME/plugins/jdbc/lib/' working directory<br/>
+> For example: cp postgresql-xxx.jar $SEATUNNEL_HOME/plugins/jdbc/lib/
+
+## Data Type Mapping
+
+Cloudberry uses PostgreSQL's data type implementation. Please refer to
PostgreSQL documentation for data type compatibility and mappings.
+
+## Options
+
+Cloudberry connector uses the same options as PostgreSQL. For detailed
configuration options, please refer to the PostgreSQL documentation.
+
+Key options include:
+- url (required): The JDBC connection URL
+- driver (required): The driver class name (org.postgresql.Driver)
+- user/password: Authentication credentials
+- query or table_path: What data to read
+- partition options for parallel reading
+
+## Parallel Reader
+
+Cloudberry supports parallel reading following the same rules as PostgreSQL
connector. For detailed information on split strategies and parallel reading
options, please refer to the PostgreSQL connector documentation.
+
+## Task Example
+
+### Simple:
+
+```hocon
+env {
+ parallelism = 4
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ url = "jdbc:postgresql://localhost:5432/cloudberrydb"
+ driver = "org.postgresql.Driver"
+ user = "dbadmin"
+ password = "password"
+ query = "select * from mytable limit 100"
+ }
+}
+
+sink {
+ Console {}
+}
+```
+
+### Parallel reading with table_path:
+
+```hocon
+env {
+ parallelism = 4
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ url = "jdbc:postgresql://localhost:5432/cloudberrydb"
+ driver = "org.postgresql.Driver"
+ user = "dbadmin"
+ password = "password"
+ table_path = "public.mytable"
+ split.size = 10000
+ }
+}
+
+sink {
+ Console {}
+}
+```
+
+### Multiple table read:
+
+```hocon
+env {
+ job.mode = "BATCH"
+ parallelism = 4
+}
+
+source {
+ Jdbc {
+ url = "jdbc:postgresql://localhost:5432/cloudberrydb"
+ driver = "org.postgresql.Driver"
+ user = "dbadmin"
+ password = "password"
+ "table_list" = [
+ {
+ "table_path" = "public.table1"
+ },
+ {
+ "table_path" = "public.table2"
+ }
+ ]
+ split.size = 10000
+ }
+}
+
+sink {
+ Console {}
+}
+```
+
+For more detailed examples and configurations, please refer to the PostgreSQL
connector documentation.
+
+## Changelog
+
+<ChangeLog />
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCloudberryIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCloudberryIT.java
new file mode 100644
index 0000000000..bb898d6761
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCloudberryIT.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.PullPolicy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+public class JdbcCloudberryIT extends AbstractJdbcIT {
+ private static final String CLOUDBERRY_IMAGE = "lhrbest/cbdb:1.5.4";
+ private static final String CLOUDBERRY_CONTAINER_HOST = "cbdb";
+ private static final String CLOUDBERRY_DATABASE = "postgres";
+
+ private static final String CLOUDBERRY_SCHEMA = "public";
+ private static final String CLOUDBERRY_SOURCE = "source";
+ private static final String CLOUDBERRY_SINK = "sink";
+
+ private static final String CLOUDBERRY_USERNAME = "gpadmin";
+ private static final String CLOUDBERRY_PASSWORD = "gpadmin";
+ private static final int CLOUDBERRY_CONTAINER_PORT = 5432;
+
+ private static final String CLOUDBERRY_URL = "jdbc:postgresql://" + HOST +
":%s/%s";
+
+ private static final String DRIVER_CLASS = "org.postgresql.Driver";
+
+ private static final List<String> CONFIG_FILE =
+ Lists.newArrayList("/jdbc_cloudberry_source_and_sink.conf");
+
+ private static final String CREATE_SQL =
+ "CREATE TABLE %s (\n" + "age INT NOT NULL,\n" + "name VARCHAR(255)
NOT NULL\n" + ")";
+
+ @Override
+ JdbcCase getJdbcCase() {
+ Map<String, String> containerEnv = new HashMap<>();
+ String jdbcUrl =
+ String.format(CLOUDBERRY_URL, CLOUDBERRY_CONTAINER_PORT,
CLOUDBERRY_DATABASE);
+ Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+ String[] fieldNames = testDataSet.getKey();
+
+ String insertSql = insertTable(CLOUDBERRY_SCHEMA, CLOUDBERRY_SOURCE,
fieldNames);
+
+ return JdbcCase.builder()
+ .dockerImage(CLOUDBERRY_IMAGE)
+ .networkAliases(CLOUDBERRY_CONTAINER_HOST)
+ .containerEnv(containerEnv)
+ .driverClass(DRIVER_CLASS)
+ .host(HOST)
+ .port(CLOUDBERRY_CONTAINER_PORT)
+ .localPort(CLOUDBERRY_CONTAINER_PORT)
+ .jdbcTemplate(CLOUDBERRY_URL)
+ .jdbcUrl(jdbcUrl)
+ .userName(CLOUDBERRY_USERNAME)
+ .password(CLOUDBERRY_PASSWORD)
+ .database(CLOUDBERRY_SCHEMA)
+ .sourceTable(CLOUDBERRY_SOURCE)
+ .sinkTable(CLOUDBERRY_SINK)
+ .createSql(CREATE_SQL)
+ .configFile(CONFIG_FILE)
+ .insertSql(insertSql)
+ .testData(testDataSet)
+ .tablePathFullName(CLOUDBERRY_SOURCE)
+ .useSaveModeCreateTable(false)
+ .build();
+ }
+
+ @Override
+ String driverUrl() {
+ return
"https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
+ }
+
+ @Override
+ Pair<String[], List<SeaTunnelRow>> initTestData() {
+ String[] fieldNames =
+ new String[] {
+ "age", "name",
+ };
+
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row =
+ new SeaTunnelRow(
+ new Object[] {
+ i, "f_" + i,
+ });
+ rows.add(row);
+ }
+
+ return Pair.of(fieldNames, rows);
+ }
+
+ @Override
+ GenericContainer<?> initContainer() {
+ DockerImageName imageName = DockerImageName.parse(CLOUDBERRY_IMAGE);
+ GenericContainer<?> container =
+ new GenericContainer<>(imageName)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(CLOUDBERRY_CONTAINER_HOST)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+
DockerLoggerFactory.getLogger(CLOUDBERRY_IMAGE)))
+ .withCommand("/usr/sbin/init") // Ensure container
starts correctly
+ .withPrivilegedMode(true); // Set privileged mode
+ // Mount cgroup volume
+ container.addFileSystemBind("/sys/fs/cgroup", "/sys/fs/cgroup",
BindMode.READ_ONLY);
+ container.setPortBindings(
+ Lists.newArrayList(
+ String.format(
+ "%s:%s", CLOUDBERRY_CONTAINER_PORT,
CLOUDBERRY_CONTAINER_PORT)));
+ return container;
+ }
+
+ @Override
+ public String quoteIdentifier(String field) {
+ return "\"" + field + "\"";
+ }
+
+ @Override
+ public void clearTable(String schema, String table) {
+ // do nothing.
+ }
+
+ @Override
+ protected void beforeStartUP() {
+ log.info("Setting up Apache Cloudberry...");
+ try {
+ // Wait for container to start
+ Thread.sleep(5000);
+ // Switch to gpadmin user and start database
+ Container.ExecResult execResult =
+ dbServer.execInContainer("bash", "-c", "su - gpadmin -c
'gpstart -a'");
+ log.info("gpstart result: {}", execResult.getStdout());
+ // Set gpadmin password
+ execResult =
+ dbServer.execInContainer(
+ "bash",
+ "-c",
+ "su - gpadmin -c \"psql -c \\\"ALTER USER gpadmin
WITH PASSWORD 'gpadmin';\\\"\"");
+ log.info("Set password result: {}", execResult.getStdout());
+ // Confirm database is started
+ execResult =
+ dbServer.execInContainer(
+ "bash", "-c", "su - gpadmin -c 'psql -c \"SELECT
version();\"'");
+ log.info("Apache Cloudberry version: {}", execResult.getStdout());
+
+ } catch (InterruptedException | IOException e) {
+ log.error("Failed to initialize Apache Cloudberry", e);
+ throw new RuntimeException("Failed to initialize Apache
Cloudberry", e);
+ }
+ }
+
+ @BeforeAll
+ @Override
+ public void startUp() {
+ dbServer =
initContainer().withImagePullPolicy(PullPolicy.alwaysPull());
+ Startables.deepStart(Stream.of(dbServer)).join();
+ jdbcCase = getJdbcCase();
+ beforeStartUP();
+ // Increase retry count and timeout, CloudberryDB might need more time
to start
+ given().ignoreExceptions()
+ .await()
+ .atMost(600, TimeUnit.SECONDS) // Increase waiting time
+ .pollInterval(10, TimeUnit.SECONDS) // Set polling interval
+ .untilAsserted(() ->
this.initializeJdbcConnection(jdbcCase.getJdbcUrl()));
+ createSchemaIfNeeded();
+ createNeededTables();
+ insertTestData();
+ initCatalog();
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_cloudberry_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_cloudberry_source_and_sink.conf
new file mode 100644
index 0000000000..af574a7655
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_cloudberry_source_and_sink.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ Jdbc {
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://cbdb:5432/postgres"
+ user = gpadmin
+ password = gpadmin
+ query = "select age, name from source"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
+}
+
+transform {
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform-v2/sql
+}
+
+sink {
+ Jdbc {
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://cbdb:5432/postgres"
+ user = gpadmin
+ password = gpadmin
+ query = "insert into sink(age, name) values(?, ?)"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
+}
\ No newline at end of file