This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9a5da78176 [Improve][Connector-V2] Add pre-check for table enable cdc 
(#8152)
9a5da78176 is described below

commit 9a5da781767358db6afcccc08c1087de8a2dd3b4
Author: Jia Fan <[email protected]>
AuthorDate: Fri Nov 29 14:13:39 2024 +0800

    [Improve][Connector-V2] Add pre-check for table enable cdc (#8152)
---
 .../cdc/base/dialect/JdbcDataSourceDialect.java    |  3 ++
 .../cdc/postgres/source/PostgresDialect.java       | 30 +++++++++++++++--
 .../cdc/sqlserver/source/SqlServerDialect.java     | 38 ++++++++++++++++++++--
 .../seatunnel/cdc/postgres/PostgresCDCIT.java      | 34 +++++++++++++++++++
 .../connector/cdc/sqlserver/SqlServerCDCIT.java    | 35 ++++++++++++++++++++
 5 files changed, 136 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
index 05e9a89c04..f9193c9eb7 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
@@ -49,6 +49,9 @@ public interface JdbcDataSourceDialect extends 
DataSourceDialect<JdbcSourceConfi
     @Override
     List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig);
 
+    default void checkAllTablesEnabledCapture(JdbcConnection jdbcConnection, 
List<TableId> tableIds)
+            throws SQLException {}
+
     /**
      * Creates and opens a new {@link JdbcConnection} backing connection pool.
      *
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
index 94781a2e54..921cb52518 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
@@ -43,6 +43,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseI
 
 import io.debezium.connector.postgresql.PostgresConnectorConfig;
 import io.debezium.connector.postgresql.connection.PostgresConnection;
+import io.debezium.connector.postgresql.connection.ServerInfo;
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.RelationalDatabaseConnectorConfig;
 import io.debezium.relational.TableId;
@@ -103,13 +104,32 @@ public class PostgresDialect implements 
JdbcDataSourceDialect {
     public List<TableId> discoverDataCollections(JdbcSourceConfig 
sourceConfig) {
         PostgresSourceConfig postgresSourceConfig = (PostgresSourceConfig) 
sourceConfig;
         try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) 
{
-            return TableDiscoveryUtils.listTables(
-                    jdbcConnection, postgresSourceConfig.getTableFilters());
+            List<TableId> tables =
+                    TableDiscoveryUtils.listTables(
+                            jdbcConnection, 
postgresSourceConfig.getTableFilters());
+            this.checkAllTablesEnabledCapture(jdbcConnection, tables);
+            return tables;
         } catch (SQLException e) {
             throw new SeaTunnelException("Error to discover tables: " + 
e.getMessage(), e);
         }
     }
 
+    @Override
+    public void checkAllTablesEnabledCapture(JdbcConnection jdbcConnection, 
List<TableId> tableIds)
+            throws SQLException {
+        PostgresConnection postgresConnection = (PostgresConnection) 
jdbcConnection;
+        for (TableId tableId : tableIds) {
+            ServerInfo.ReplicaIdentity replicaIdentity =
+                    postgresConnection.readReplicaIdentityInfo(tableId);
+            if (!ServerInfo.ReplicaIdentity.FULL.equals(replicaIdentity)) {
+                throw new SeaTunnelException(
+                        String.format(
+                                "Table %s does not have a full replica 
identity, please execute: ALTER TABLE %s REPLICA IDENTITY FULL;",
+                                tableId, tableId));
+            }
+        }
+    }
+
     @Override
     public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, 
TableId tableId) {
         if (postgresSchema == null) {
@@ -155,6 +175,12 @@ public class PostgresDialect implements 
JdbcDataSourceDialect {
         if (sourceSplitBase.isSnapshotSplit()) {
             return new 
PostgresSnapshotFetchTask(sourceSplitBase.asSnapshotSplit());
         } else {
+            try (JdbcConnection jdbcConnection = 
openJdbcConnection(sourceConfig)) {
+                List<TableId> tables = 
sourceSplitBase.asIncrementalSplit().getTableIds();
+                this.checkAllTablesEnabledCapture(jdbcConnection, tables);
+            } catch (SQLException e) {
+                throw new SeaTunnelException("Error to check tables: " + 
e.getMessage(), e);
+            }
             postgresWalFetchTask = new 
PostgresWalFetchTask(sourceSplitBase.asIncrementalSplit());
             return postgresWalFetchTask;
         }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerDialect.java
index 816ba8eb4b..55838d16b1 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerDialect.java
@@ -38,6 +38,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerSc
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.TableDiscoveryUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 
+import io.debezium.connector.sqlserver.SqlServerChangeTable;
+import io.debezium.connector.sqlserver.SqlServerConnection;
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges;
@@ -46,6 +48,8 @@ import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
 public class SqlServerDialect implements JdbcDataSourceDialect {
@@ -88,13 +92,37 @@ public class SqlServerDialect implements 
JdbcDataSourceDialect {
     public List<TableId> discoverDataCollections(JdbcSourceConfig 
sourceConfig) {
         SqlServerSourceConfig sqlServerSourceConfig = (SqlServerSourceConfig) 
sourceConfig;
         try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) 
{
-            return TableDiscoveryUtils.listTables(
-                    jdbcConnection, sqlServerSourceConfig.getTableFilters());
+            List<TableId> tables =
+                    TableDiscoveryUtils.listTables(
+                            jdbcConnection, 
sqlServerSourceConfig.getTableFilters());
+            this.checkAllTablesEnabledCapture(jdbcConnection, tables);
+            return tables;
         } catch (SQLException e) {
             throw new SeaTunnelException("Error to discover tables: " + 
e.getMessage(), e);
         }
     }
 
+    @Override
+    public void checkAllTablesEnabledCapture(JdbcConnection jdbcConnection, 
List<TableId> tableIds)
+            throws SQLException {
+        Map<String, List<TableId>> databases =
+                tableIds.stream()
+                        .collect(Collectors.groupingBy(TableId::catalog, 
Collectors.toList()));
+        for (String database : databases.keySet()) {
+            Set<TableId> tables =
+                    ((SqlServerConnection) jdbcConnection)
+                            .getChangeTables(database).stream()
+                                    
.map(SqlServerChangeTable::getSourceTableId)
+                                    .collect(Collectors.toSet());
+            for (TableId tableId : databases.get(database)) {
+                if (!tables.contains(tableId)) {
+                    throw new SeaTunnelException(
+                            "Table " + tableId + " is not enabled for 
capture");
+                }
+            }
+        }
+    }
+
     @Override
     public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, 
TableId tableId) {
         if (sqlServerSchema == null) {
@@ -115,6 +143,12 @@ public class SqlServerDialect implements 
JdbcDataSourceDialect {
         if (sourceSplitBase.isSnapshotSplit()) {
             return new 
SqlServerSnapshotFetchTask(sourceSplitBase.asSnapshotSplit());
         } else {
+            try (JdbcConnection jdbcConnection = 
openJdbcConnection(sourceConfig)) {
+                List<TableId> tables = 
sourceSplitBase.asIncrementalSplit().getTableIds();
+                this.checkAllTablesEnabledCapture(jdbcConnection, tables);
+            } catch (SQLException e) {
+                throw new SeaTunnelException("Error to check tables: " + 
e.getMessage(), e);
+            }
             return new 
SqlServerTransactionLogFetchTask(sourceSplitBase.asIncrementalSplit());
         }
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
index acb9a2a41c..80961b61a3 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
@@ -17,6 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.cdc.postgres;
 
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfigFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfigFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.PostgresDialect;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
@@ -29,6 +33,7 @@ import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestTemplate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +44,8 @@ import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerImageName;
 
 import com.google.common.collect.Lists;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.TableId;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
@@ -53,6 +60,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -594,6 +602,32 @@ public class PostgresCDCIT extends TestSuiteBase 
implements TestResource {
         }
     }
 
+    @Test
+    public void testDialectCheckDisabledCDCTable() throws SQLException {
+        JdbcSourceConfigFactory factory =
+                new PostgresSourceConfigFactory()
+                        .hostname(POSTGRES_CONTAINER.getHost())
+                        .port(5432)
+                        .username("postgres")
+                        .password("postgres")
+                        .databaseList(POSTGRESQL_DATABASE);
+        PostgresDialect dialect =
+                new PostgresDialect((PostgresSourceConfigFactory) factory, 
Collections.emptyList());
+        try (JdbcConnection connection = 
dialect.openJdbcConnection(factory.create(0))) {
+            SeaTunnelException exception =
+                    Assertions.assertThrows(
+                            SeaTunnelException.class,
+                            () ->
+                                    dialect.checkAllTablesEnabledCapture(
+                                            connection,
+                                            Collections.singletonList(
+                                                    
TableId.parse(SINK_TABLE_1))));
+            Assertions.assertEquals(
+                    "Table sink_postgres_cdc_table_1 does not have a full 
replica identity, please execute: ALTER TABLE sink_postgres_cdc_table_1 REPLICA 
IDENTITY FULL;",
+                    exception.getMessage());
+        }
+    }
+
     private Connection getJdbcConnection() throws SQLException {
         return DriverManager.getConnection(
                 POSTGRES_CONTAINER.getJdbcUrl(),
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index 1b699d5805..94867d2014 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -17,6 +17,10 @@
 
 package org.apache.seatunnel.e2e.connector.cdc.sqlserver;
 
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfigFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config.SqlServerSourceConfigFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.SqlServerDialect;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
@@ -31,6 +35,7 @@ import org.awaitility.core.ConditionTimeoutException;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.MSSQLServerContainer;
@@ -39,6 +44,8 @@ import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerLoggerFactory;
 
 import com.google.common.collect.Lists;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.TableId;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
@@ -52,6 +59,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
@@ -346,6 +354,33 @@ public class SqlServerCDCIT extends TestSuiteBase 
implements TestResource {
         }
     }
 
+    @Test
+    public void testDialectCheckDisabledCDCTable() throws SQLException {
+        initializeSqlServerTable("column_type_test");
+        JdbcSourceConfigFactory factory =
+                new SqlServerSourceConfigFactory()
+                        .hostname(MSSQL_SERVER_CONTAINER.getHost())
+                        .port(PORT)
+                        .username("sa")
+                        .password("Password!")
+                        .databaseList("column_type_test");
+        SqlServerDialect dialect =
+                new SqlServerDialect(
+                        (SqlServerSourceConfigFactory) factory, 
Collections.emptyList());
+        try (JdbcConnection connection = 
dialect.openJdbcConnection(factory.create(0))) {
+            SeaTunnelException exception =
+                    Assertions.assertThrows(
+                            SeaTunnelException.class,
+                            () ->
+                                    dialect.checkAllTablesEnabledCapture(
+                                            connection,
+                                            
Collections.singletonList(TableId.parse(SINK_TABLE))));
+            Assertions.assertEquals(
+                    "Table column_type_test.dbo.full_types_sink is not enabled 
for capture",
+                    exception.getMessage());
+        }
+    }
+
     /**
      * Executes a JDBC statement using the default jdbc config without 
autocommitting the
      * connection.

Reply via email to