This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9af696a1d [Bug] [connector-v2] PostgreSQL versions below 9.5 are 
compatible use cdc sync problem (#5120)
9af696a1d is described below

commit 9af696a1dd1cff2e9b3773f38f7c81501abcfe16
Author: kun <[email protected]>
AuthorDate: Wed Aug 2 16:55:08 2023 +0800

    [Bug] [connector-v2] PostgreSQL versions below 9.5 are compatible use cdc 
sync problem (#5120)
---
 docs/en/connector-v2/sink/Jdbc.md                  | 22 ++++++++++++
 .../dialect/psql/PostgresDialectFactory.java       | 12 +++++++
 .../PostgresLowDialect.java}                       | 20 ++++-------
 .../dialect/PostgresDialectFactoryTest.java        | 40 ++++++++++++++++++++++
 4 files changed, 81 insertions(+), 13 deletions(-)

diff --git a/docs/en/connector-v2/sink/Jdbc.md 
b/docs/en/connector-v2/sink/Jdbc.md
index f128f6b4b..9d68278cf 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -74,6 +74,8 @@ Use this sql write upstream input datas to database. e.g 
`INSERT ...`
 
 The compatible mode of database, required when the database supports multiple 
compatible modes. For example, when using OceanBase database, you need to set 
it to 'mysql' or 'oracle'.
 
+Postgres 9.5 version or below,please set it to `postgresLow` to support cdc
+
 ### database [string]
 
 Use this `database` and `table-name` auto-generate sql and receive upstream 
input datas write to database.
@@ -226,6 +228,26 @@ sink {
 }
 ```
 
+Postgresql 9.5 version below support CDC(Change data capture) event
+
+```
+sink {
+    jdbc {
+        url = "jdbc:postgresql://localhost:5432"
+        driver = "org.postgresql.Driver"
+        user = "root"
+        password = "123456"
+        compatible_mode="postgresLow"
+        database = "sink_database"
+        table = "sink_table"
+        support_upsert_by_query_primary_key_exist = true
+        generate_sink_sql = true
+        primary_keys = ["key1", "key2", ...]
+    }
+}
+
+```
+
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
index 963f7385e..857c85290 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
@@ -19,9 +19,12 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
 
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psqllow.PostgresLowDialect;
 
 import com.google.auto.service.AutoService;
 
+import javax.annotation.Nonnull;
+
 @AutoService(JdbcDialectFactory.class)
 public class PostgresDialectFactory implements JdbcDialectFactory {
     @Override
@@ -31,6 +34,15 @@ public class PostgresDialectFactory implements 
JdbcDialectFactory {
 
     @Override
     public JdbcDialect create() {
+        throw new UnsupportedOperationException(
+                "Can't create JdbcDialect without compatible mode for 
Postgres");
+    }
+
+    @Override
+    public JdbcDialect create(@Nonnull String compatibleMode) {
+        if ("postgresLow".equalsIgnoreCase(compatibleMode)) {
+            return new PostgresLowDialect();
+        }
         return new PostgresDialect();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java
similarity index 67%
copy from 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
copy to 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java
index 963f7385e..e367207ff 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psqllow/PostgresLowDialect.java
@@ -15,22 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psqllow;
 
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialect;
 
-import com.google.auto.service.AutoService;
+import java.util.Optional;
 
-@AutoService(JdbcDialectFactory.class)
-public class PostgresDialectFactory implements JdbcDialectFactory {
+public class PostgresLowDialect extends PostgresDialect {
     @Override
-    public boolean acceptsURL(String url) {
-        return url.startsWith("jdbc:postgresql:");
-    }
-
-    @Override
-    public JdbcDialect create() {
-        return new PostgresDialect();
+    public Optional<String> getUpsertStatement(
+            String database, String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {
+        return Optional.empty();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/PostgresDialectFactoryTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/PostgresDialectFactoryTest.java
new file mode 100644
index 000000000..79b1f11ac
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/PostgresDialectFactoryTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialectFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+public class PostgresDialectFactoryTest {
+
+    @Test
+    public void testPostgresDialectCreate() {
+        PostgresDialectFactory postgresDialectFactory = new 
PostgresDialectFactory();
+        JdbcDialect postgresLow = postgresDialectFactory.create("postgresLow");
+        String[] fields = {"id", "name", "age"};
+        String[] uniqueKeyField = {"id"};
+        Optional<String> upsertStatement =
+                postgresLow.getUpsertStatement("test", "test_a", fields, 
uniqueKeyField);
+        Assertions.assertFalse(upsertStatement.isPresent());
+    }
+}

Reply via email to