Copilot commented on code in PR #60560:
URL: https://github.com/apache/doris/pull/60560#discussion_r2771936633


##########
fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java:
##########
@@ -0,0 +1,932 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at 
http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.postgresql.connection;
+
+import io.debezium.DebeziumException;
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.connector.postgresql.TypeRegistry;
+import io.debezium.connector.postgresql.spi.SlotCreationResult;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.jdbc.JdbcConnectionException;
+import io.debezium.relational.RelationalTableFilters;
+import io.debezium.relational.TableId;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.postgresql.core.BaseConnection;
+import org.postgresql.core.ServerVersion;
+import org.postgresql.replication.PGReplicationStream;
+import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
+import org.postgresql.util.PSQLException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.toIntExact;
+
+/**
+ * Copied from Flink Cdc 3.5.0
+ *
+ * <p>Line 192~199: add publish_via_partition_root for partition table.
+ */
+public class PostgresReplicationConnection extends JdbcConnection implements 
ReplicationConnection {
+
+    private static Logger LOGGER = 
LoggerFactory.getLogger(PostgresReplicationConnection.class);
+
+    private final String slotName;
+    private final String publicationName;
+    private final RelationalTableFilters tableFilter;
+    private final PostgresConnectorConfig.AutoCreateMode 
publicationAutocreateMode;
+    private final PostgresConnectorConfig.LogicalDecoder plugin;
+    private final boolean dropSlotOnClose;
+    private final PostgresConnectorConfig connectorConfig;
+    private final Duration statusUpdateInterval;
+    private final MessageDecoder messageDecoder;
+    private final PostgresConnection jdbcConnection;
+    private final TypeRegistry typeRegistry;
+    private final Properties streamParams;
+
+    private Lsn defaultStartingPos;
+    private SlotCreationResult slotCreationInfo;
+    private boolean hasInitedSlot;
+
+    private Lsn endingPos;
+
+    /**
+     * Creates a new replication connection with the given params.
+     *
+     * @param config the JDBC configuration for the connection; may not be null
+     * @param slotName the name of the DB slot for logical replication; may 
not be null
+     * @param publicationName the name of the DB publication for logical 
replication; may not be
+     *     null
+     * @param tableFilter the tables to watch of the DB publication for 
logical replication; may not
+     *     be null
+     * @param publicationAutocreateMode the mode for publication autocreation; 
may not be null
+     * @param plugin decoder matching the server side plug-in used for 
streaming changes; may not be
+     *     null
+     * @param dropSlotOnClose whether the replication slot should be dropped 
once the connection is
+     *     closed
+     * @param statusUpdateInterval the interval at which the replication 
connection should
+     *     periodically send status
+     * @param doSnapshot whether the connector is doing snapshot
+     * @param jdbcConnection general PostgreSQL JDBC connection
+     * @param typeRegistry registry with PostgreSQL types
+     * @param streamParams additional parameters to pass to the replication 
stream
+     * @param schema the schema; must not be null
+     *     <p>updates to the server
+     */
+    private PostgresReplicationConnection(
+            PostgresConnectorConfig config,
+            String slotName,
+            String publicationName,
+            RelationalTableFilters tableFilter,
+            PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode,
+            PostgresConnectorConfig.LogicalDecoder plugin,
+            boolean dropSlotOnClose,
+            boolean doSnapshot,
+            Duration statusUpdateInterval,
+            PostgresConnection jdbcConnection,
+            TypeRegistry typeRegistry,
+            Properties streamParams,
+            PostgresSchema schema) {
+        super(
+                addDefaultSettings(config.getJdbcConfig()),
+                PostgresConnection.FACTORY,
+                null,
+                null,
+                "\"",
+                "\"");
+
+        this.connectorConfig = config;
+        this.slotName = slotName;
+        this.publicationName = publicationName;
+        this.tableFilter = tableFilter;
+        this.publicationAutocreateMode = publicationAutocreateMode;
+        this.plugin = plugin;
+        this.dropSlotOnClose = dropSlotOnClose;
+        this.statusUpdateInterval = statusUpdateInterval;
+        this.messageDecoder =
+                plugin.messageDecoder(new MessageDecoderContext(config, 
schema), jdbcConnection);
+        this.jdbcConnection = jdbcConnection;
+        this.typeRegistry = typeRegistry;
+        this.streamParams = streamParams;
+        this.slotCreationInfo = null;
+        this.hasInitedSlot = false;
+    }
+
+    private static JdbcConfiguration addDefaultSettings(JdbcConfiguration 
configuration) {
+        // first copy the parent's default settings...
+        // then set some additional replication specific settings
+        return JdbcConfiguration.adapt(
+                PostgresConnection.addDefaultSettings(
+                                configuration, 
PostgresConnection.CONNECTION_STREAMING)
+                        .edit()
+                        .with("replication", "database")
+                        .with(
+                                "preferQueryMode",
+                                "simple") // replication protocol only 
supports simple query mode
+                        .build());
+    }
+
+    private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, 
InterruptedException {
+        try (PostgresConnection connection =
+                new PostgresConnection(
+                        connectorConfig.getJdbcConfig(), 
PostgresConnection.CONNECTION_SLOT_INFO)) {
+            return connection.readReplicationSlotInfo(slotName, 
plugin.getPostgresPluginName());
+        }
+    }
+
+    protected void initPublication() {
+        String createPublicationStmt;
+        String tableFilterString = null;
+        if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) {
+            LOGGER.info("Initializing PgOutput logical decoder publication");
+            try {
+                // Unless the autocommit is disabled the SELECT publication 
query will stay running
+                Connection conn = pgConnection();
+                conn.setAutoCommit(false);
+
+                String selectPublication =
+                        String.format(
+                                "SELECT COUNT(1) FROM pg_publication WHERE 
pubname = '%s'",
+                                publicationName);
+                try (Statement stmt = conn.createStatement();
+                        ResultSet rs = stmt.executeQuery(selectPublication)) {
+                    if (rs.next()) {
+                        Long count = rs.getLong(1);
+                        // Close eagerly as the transaction might stay running
+                        if (count == 0L) {
+                            LOGGER.info(
+                                    "Creating new publication '{}' for plugin 
'{}'",
+                                    publicationName,
+                                    plugin);
+                            switch (publicationAutocreateMode) {
+                                case DISABLED:
+                                    throw new ConnectException(
+                                            "Publication autocreation is 
disabled, please create one and restart the connector.");
+                                case ALL_TABLES:
+                                    boolean supportPartitionRoot = 
((BaseConnection) conn).haveMinimumServerVersion(ServerVersion.v13);
+                                    createPublicationStmt = 
supportPartitionRoot
+                                            ? String.format(
+                                                    "CREATE PUBLICATION %s FOR 
ALL TABLES WITH (publish_via_partition_root = true);",
+                                                    publicationName)
+                                            : String.format(
+                                                    "CREATE PUBLICATION %s FOR 
ALL TABLES;",
+                                                    publicationName);
+                                    LOGGER.info(
+                                            "Creating Publication with 
statement '{}'",
+                                            createPublicationStmt);
+                                    // Publication doesn't exist, create it.
+                                    stmt.execute(createPublicationStmt);
+                                    break;
+                                case FILTERED:
+                                    createOrUpdatePublicationModeFilterted(
+                                            tableFilterString, stmt, false);
+                                    break;
+                            }
+                        } else {
+                            switch (publicationAutocreateMode) {
+                                case FILTERED:
+                                    createOrUpdatePublicationModeFilterted(

Review Comment:
   The method name contains a spelling error: "Filterred" should be "Filtered". 
This typo appears in the method call createOrUpdatePublicationModeFilterted and 
should be corrected to createOrUpdatePublicationModeFiltered for better code 
readability and maintainability.
   ```suggestion
                                       createOrUpdatePublicationModeFiltered(
                                               tableFilterString, stmt, false);
                                       break;
                               }
                           } else {
                               switch (publicationAutocreateMode) {
                                   case FILTERED:
                                       createOrUpdatePublicationModeFiltered(
   ```



##########
fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java:
##########
@@ -0,0 +1,932 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at 
http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.postgresql.connection;
+
+import io.debezium.DebeziumException;
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.connector.postgresql.TypeRegistry;
+import io.debezium.connector.postgresql.spi.SlotCreationResult;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.jdbc.JdbcConnectionException;
+import io.debezium.relational.RelationalTableFilters;
+import io.debezium.relational.TableId;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.postgresql.core.BaseConnection;
+import org.postgresql.core.ServerVersion;
+import org.postgresql.replication.PGReplicationStream;
+import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
+import org.postgresql.util.PSQLException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.toIntExact;
+
+/**
+ * Copied from Flink Cdc 3.5.0
+ *
+ * <p>Line 192~199: add publish_via_partition_root for partition table.
+ */
+public class PostgresReplicationConnection extends JdbcConnection implements 
ReplicationConnection {
+
+    private static Logger LOGGER = 
LoggerFactory.getLogger(PostgresReplicationConnection.class);
+
+    private final String slotName;
+    private final String publicationName;
+    private final RelationalTableFilters tableFilter;
+    private final PostgresConnectorConfig.AutoCreateMode 
publicationAutocreateMode;
+    private final PostgresConnectorConfig.LogicalDecoder plugin;
+    private final boolean dropSlotOnClose;
+    private final PostgresConnectorConfig connectorConfig;
+    private final Duration statusUpdateInterval;
+    private final MessageDecoder messageDecoder;
+    private final PostgresConnection jdbcConnection;
+    private final TypeRegistry typeRegistry;
+    private final Properties streamParams;
+
+    private Lsn defaultStartingPos;
+    private SlotCreationResult slotCreationInfo;
+    private boolean hasInitedSlot;
+
+    private Lsn endingPos;
+
+    /**
+     * Creates a new replication connection with the given params.
+     *
+     * @param config the JDBC configuration for the connection; may not be null
+     * @param slotName the name of the DB slot for logical replication; may 
not be null
+     * @param publicationName the name of the DB publication for logical 
replication; may not be
+     *     null
+     * @param tableFilter the tables to watch of the DB publication for 
logical replication; may not
+     *     be null
+     * @param publicationAutocreateMode the mode for publication autocreation; 
may not be null
+     * @param plugin decoder matching the server side plug-in used for 
streaming changes; may not be
+     *     null
+     * @param dropSlotOnClose whether the replication slot should be dropped 
once the connection is
+     *     closed
+     * @param statusUpdateInterval the interval at which the replication 
connection should
+     *     periodically send status
+     * @param doSnapshot whether the connector is doing snapshot
+     * @param jdbcConnection general PostgreSQL JDBC connection
+     * @param typeRegistry registry with PostgreSQL types
+     * @param streamParams additional parameters to pass to the replication 
stream
+     * @param schema the schema; must not be null
+     *     <p>updates to the server
+     */
+    private PostgresReplicationConnection(
+            PostgresConnectorConfig config,
+            String slotName,
+            String publicationName,
+            RelationalTableFilters tableFilter,
+            PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode,
+            PostgresConnectorConfig.LogicalDecoder plugin,
+            boolean dropSlotOnClose,
+            boolean doSnapshot,
+            Duration statusUpdateInterval,
+            PostgresConnection jdbcConnection,
+            TypeRegistry typeRegistry,
+            Properties streamParams,
+            PostgresSchema schema) {
+        super(
+                addDefaultSettings(config.getJdbcConfig()),
+                PostgresConnection.FACTORY,
+                null,
+                null,
+                "\"",
+                "\"");
+
+        this.connectorConfig = config;
+        this.slotName = slotName;
+        this.publicationName = publicationName;
+        this.tableFilter = tableFilter;
+        this.publicationAutocreateMode = publicationAutocreateMode;
+        this.plugin = plugin;
+        this.dropSlotOnClose = dropSlotOnClose;
+        this.statusUpdateInterval = statusUpdateInterval;
+        this.messageDecoder =
+                plugin.messageDecoder(new MessageDecoderContext(config, 
schema), jdbcConnection);
+        this.jdbcConnection = jdbcConnection;
+        this.typeRegistry = typeRegistry;
+        this.streamParams = streamParams;
+        this.slotCreationInfo = null;
+        this.hasInitedSlot = false;
+    }
+
+    private static JdbcConfiguration addDefaultSettings(JdbcConfiguration 
configuration) {
+        // first copy the parent's default settings...
+        // then set some additional replication specific settings
+        return JdbcConfiguration.adapt(
+                PostgresConnection.addDefaultSettings(
+                                configuration, 
PostgresConnection.CONNECTION_STREAMING)
+                        .edit()
+                        .with("replication", "database")
+                        .with(
+                                "preferQueryMode",
+                                "simple") // replication protocol only 
supports simple query mode
+                        .build());
+    }
+
+    private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, 
InterruptedException {
+        try (PostgresConnection connection =
+                new PostgresConnection(
+                        connectorConfig.getJdbcConfig(), 
PostgresConnection.CONNECTION_SLOT_INFO)) {
+            return connection.readReplicationSlotInfo(slotName, 
plugin.getPostgresPluginName());
+        }
+    }
+
+    protected void initPublication() {
+        String createPublicationStmt;
+        String tableFilterString = null;
+        if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) {
+            LOGGER.info("Initializing PgOutput logical decoder publication");
+            try {
+                // Unless the autocommit is disabled the SELECT publication 
query will stay running
+                Connection conn = pgConnection();
+                conn.setAutoCommit(false);
+
+                String selectPublication =
+                        String.format(
+                                "SELECT COUNT(1) FROM pg_publication WHERE 
pubname = '%s'",
+                                publicationName);
+                try (Statement stmt = conn.createStatement();
+                        ResultSet rs = stmt.executeQuery(selectPublication)) {
+                    if (rs.next()) {
+                        Long count = rs.getLong(1);
+                        // Close eagerly as the transaction might stay running
+                        if (count == 0L) {
+                            LOGGER.info(
+                                    "Creating new publication '{}' for plugin 
'{}'",
+                                    publicationName,
+                                    plugin);
+                            switch (publicationAutocreateMode) {
+                                case DISABLED:
+                                    throw new ConnectException(
+                                            "Publication autocreation is 
disabled, please create one and restart the connector.");
+                                case ALL_TABLES:
+                                    boolean supportPartitionRoot = 
((BaseConnection) conn).haveMinimumServerVersion(ServerVersion.v13);
+                                    createPublicationStmt = 
supportPartitionRoot
+                                            ? String.format(
+                                                    "CREATE PUBLICATION %s FOR 
ALL TABLES WITH (publish_via_partition_root = true);",
+                                                    publicationName)
+                                            : String.format(
+                                                    "CREATE PUBLICATION %s FOR 
ALL TABLES;",
+                                                    publicationName);
+                                    LOGGER.info(
+                                            "Creating Publication with 
statement '{}'",
+                                            createPublicationStmt);
+                                    // Publication doesn't exist, create it.
+                                    stmt.execute(createPublicationStmt);
+                                    break;
+                                case FILTERED:
+                                    createOrUpdatePublicationModeFilterted(
+                                            tableFilterString, stmt, false);
+                                    break;
+                            }
+                        } else {
+                            switch (publicationAutocreateMode) {
+                                case FILTERED:
+                                    createOrUpdatePublicationModeFilterted(
+                                            tableFilterString, stmt, true);
+                                    break;
+                                default:
+                                    LOGGER.trace(
+                                            "A logical publication named '{}' 
for plugin '{}' and database '{}' is already active on the server "
+                                                    + "and will be used by the 
plugin",
+                                            publicationName,
+                                            plugin,
+                                            database());
+                            }
+                        }
+                    }
+                }
+                conn.commit();
+                conn.setAutoCommit(true);
+            } catch (SQLException e) {
+                throw new JdbcConnectionException(e);
+            }
+        }
+    }
+
+    private void createOrUpdatePublicationModeFilterted(
+            String tableFilterString, Statement stmt, boolean isUpdate) {
+        String createOrUpdatePublicationStmt;
+        try {
+            Set<TableId> tablesToCapture = determineCapturedTables();
+            tableFilterString =
+                    tablesToCapture.stream()
+                            .map(TableId::toDoubleQuotedString)
+                            .collect(Collectors.joining(", "));
+            if (tableFilterString.isEmpty()) {
+                throw new DebeziumException(
+                        String.format(
+                                "No table filters found for filtered 
publication %s",
+                                publicationName));
+            }
+            createOrUpdatePublicationStmt =
+                    isUpdate
+                            ? String.format(
+                            "ALTER PUBLICATION %s SET TABLE %s;",
+                            publicationName, tableFilterString)
+                            : String.format(
+                                    "CREATE PUBLICATION %s FOR TABLE %s;",
+                                    publicationName, tableFilterString);

Review Comment:
   When creating publications in FILTERED mode, the publish_via_partition_root 
option is not being applied. This means partition tables won't be properly 
synchronized when using table filters. The FILTERED mode should also check for 
PostgreSQL 13+ and include publish_via_partition_root option in the CREATE 
PUBLICATION statement, similar to how it's done for ALL_TABLES mode on lines 
192-199. This could cause data synchronization issues with partition tables 
when using filtered publications.
   ```suggestion
               BaseConnection baseConnection = 
connection().unwrap(BaseConnection.class);
               boolean isPg13OrLater =
                       baseConnection != null
                               && 
baseConnection.haveMinimumServerVersion(ServerVersion.v13);
               String publicationOptions =
                       !isUpdate && isPg13OrLater
                               ? " WITH (publish_via_partition_root = true)"
                               : "";
               createOrUpdatePublicationStmt =
                       isUpdate
                               ? String.format(
                                       "ALTER PUBLICATION %s SET TABLE %s;",
                                       publicationName, tableFilterString)
                               : String.format(
                                       "CREATE PUBLICATION %s FOR TABLE %s%s;",
                                       publicationName, tableFilterString, 
publicationOptions);
   ```



##########
fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java:
##########
@@ -0,0 +1,932 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at 
http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.postgresql.connection;
+
+import io.debezium.DebeziumException;
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.connector.postgresql.TypeRegistry;
+import io.debezium.connector.postgresql.spi.SlotCreationResult;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.jdbc.JdbcConnectionException;
+import io.debezium.relational.RelationalTableFilters;
+import io.debezium.relational.TableId;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.postgresql.core.BaseConnection;
+import org.postgresql.core.ServerVersion;
+import org.postgresql.replication.PGReplicationStream;
+import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
+import org.postgresql.util.PSQLException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.toIntExact;
+
+/**
+ * Copied from Flink Cdc 3.5.0
+ *
+ * <p>Line 192~199: add publish_via_partition_root for partition table.
+ */
+public class PostgresReplicationConnection extends JdbcConnection implements 
ReplicationConnection {
+
+    private static Logger LOGGER = 
LoggerFactory.getLogger(PostgresReplicationConnection.class);
+
+    private final String slotName;
+    private final String publicationName;
+    private final RelationalTableFilters tableFilter;
+    private final PostgresConnectorConfig.AutoCreateMode 
publicationAutocreateMode;
+    private final PostgresConnectorConfig.LogicalDecoder plugin;
+    private final boolean dropSlotOnClose;
+    private final PostgresConnectorConfig connectorConfig;
+    private final Duration statusUpdateInterval;
+    private final MessageDecoder messageDecoder;
+    private final PostgresConnection jdbcConnection;
+    private final TypeRegistry typeRegistry;
+    private final Properties streamParams;
+
+    private Lsn defaultStartingPos;
+    private SlotCreationResult slotCreationInfo;
+    private boolean hasInitedSlot;
+
+    private Lsn endingPos;
+
+    /**
+     * Creates a new replication connection with the given params.
+     *
+     * @param config the JDBC configuration for the connection; may not be null
+     * @param slotName the name of the DB slot for logical replication; may 
not be null
+     * @param publicationName the name of the DB publication for logical 
replication; may not be
+     *     null
+     * @param tableFilter the tables to watch of the DB publication for 
logical replication; may not
+     *     be null
+     * @param publicationAutocreateMode the mode for publication autocreation; 
may not be null
+     * @param plugin decoder matching the server side plug-in used for 
streaming changes; may not be
+     *     null
+     * @param dropSlotOnClose whether the replication slot should be dropped 
once the connection is
+     *     closed
+     * @param statusUpdateInterval the interval at which the replication 
connection should
+     *     periodically send status
+     * @param doSnapshot whether the connector is doing snapshot
+     * @param jdbcConnection general PostgreSQL JDBC connection
+     * @param typeRegistry registry with PostgreSQL types
+     * @param streamParams additional parameters to pass to the replication 
stream
+     * @param schema the schema; must not be null
+     *     <p>updates to the server
+     */
+    private PostgresReplicationConnection(
+            PostgresConnectorConfig config,
+            String slotName,
+            String publicationName,
+            RelationalTableFilters tableFilter,
+            PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode,
+            PostgresConnectorConfig.LogicalDecoder plugin,
+            boolean dropSlotOnClose,
+            boolean doSnapshot,
+            Duration statusUpdateInterval,
+            PostgresConnection jdbcConnection,
+            TypeRegistry typeRegistry,
+            Properties streamParams,
+            PostgresSchema schema) {
+        super(
+                addDefaultSettings(config.getJdbcConfig()),
+                PostgresConnection.FACTORY,
+                null,
+                null,
+                "\"",
+                "\"");
+
+        this.connectorConfig = config;
+        this.slotName = slotName;
+        this.publicationName = publicationName;
+        this.tableFilter = tableFilter;
+        this.publicationAutocreateMode = publicationAutocreateMode;
+        this.plugin = plugin;
+        this.dropSlotOnClose = dropSlotOnClose;
+        this.statusUpdateInterval = statusUpdateInterval;
+        this.messageDecoder =
+                plugin.messageDecoder(new MessageDecoderContext(config, 
schema), jdbcConnection);
+        this.jdbcConnection = jdbcConnection;
+        this.typeRegistry = typeRegistry;
+        this.streamParams = streamParams;
+        this.slotCreationInfo = null;
+        this.hasInitedSlot = false;
+    }
+
+    private static JdbcConfiguration addDefaultSettings(JdbcConfiguration 
configuration) {
+        // first copy the parent's default settings...
+        // then set some additional replication specific settings
+        return JdbcConfiguration.adapt(
+                PostgresConnection.addDefaultSettings(
+                                configuration, 
PostgresConnection.CONNECTION_STREAMING)
+                        .edit()
+                        .with("replication", "database")
+                        .with(
+                                "preferQueryMode",
+                                "simple") // replication protocol only 
supports simple query mode
+                        .build());
+    }
+
+    private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, 
InterruptedException {
+        try (PostgresConnection connection =
+                new PostgresConnection(
+                        connectorConfig.getJdbcConfig(), 
PostgresConnection.CONNECTION_SLOT_INFO)) {
+            return connection.readReplicationSlotInfo(slotName, 
plugin.getPostgresPluginName());
+        }
+    }
+
+    protected void initPublication() {
+        String createPublicationStmt;
+        String tableFilterString = null;
+        if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) {
+            LOGGER.info("Initializing PgOutput logical decoder publication");
+            try {
+                // Unless the autocommit is disabled the SELECT publication 
query will stay running
+                Connection conn = pgConnection();
+                conn.setAutoCommit(false);
+
+                String selectPublication =
+                        String.format(
+                                "SELECT COUNT(1) FROM pg_publication WHERE 
pubname = '%s'",
+                                publicationName);
+                try (Statement stmt = conn.createStatement();
+                        ResultSet rs = stmt.executeQuery(selectPublication)) {
+                    if (rs.next()) {
+                        Long count = rs.getLong(1);
+                        // Close eagerly as the transaction might stay running
+                        if (count == 0L) {
+                            LOGGER.info(
+                                    "Creating new publication '{}' for plugin 
'{}'",
+                                    publicationName,
+                                    plugin);
+                            switch (publicationAutocreateMode) {
+                                case DISABLED:
+                                    throw new ConnectException(
+                                            "Publication autocreation is 
disabled, please create one and restart the connector.");
+                                case ALL_TABLES:
+                                    boolean supportPartitionRoot = 
((BaseConnection) conn).haveMinimumServerVersion(ServerVersion.v13);
+                                    createPublicationStmt = 
supportPartitionRoot
+                                            ? String.format(
+                                                    "CREATE PUBLICATION %s FOR 
ALL TABLES WITH (publish_via_partition_root = true);",
+                                                    publicationName)
+                                            : String.format(
+                                                    "CREATE PUBLICATION %s FOR 
ALL TABLES;",
+                                                    publicationName);
+                                    LOGGER.info(
+                                            "Creating Publication with 
statement '{}'",
+                                            createPublicationStmt);
+                                    // Publication doesn't exist, create it.
+                                    stmt.execute(createPublicationStmt);
+                                    break;
+                                case FILTERED:
+                                    createOrUpdatePublicationModeFilterted(
+                                            tableFilterString, stmt, false);
+                                    break;
+                            }
+                        } else {
+                            switch (publicationAutocreateMode) {
+                                case FILTERED:
+                                    createOrUpdatePublicationModeFilterted(

Review Comment:
   The method name contains a spelling error: "Filterred" should be "Filtered". 
This typo appears in the method call createOrUpdatePublicationModeFilterted and 
should be corrected to createOrUpdatePublicationModeFiltered for better code 
readability and maintainability.
   ```suggestion
                                       createOrUpdatePublicationModeFiltered(
                                               tableFilterString, stmt, false);
                                       break;
                               }
                           } else {
                               switch (publicationAutocreateMode) {
                                   case FILTERED:
                                       createOrUpdatePublicationModeFiltered(
   ```



##########
fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java:
##########
@@ -0,0 +1,932 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at 
http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.postgresql.connection;
+
+import io.debezium.DebeziumException;
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.connector.postgresql.TypeRegistry;
+import io.debezium.connector.postgresql.spi.SlotCreationResult;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.jdbc.JdbcConnectionException;
+import io.debezium.relational.RelationalTableFilters;
+import io.debezium.relational.TableId;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.postgresql.core.BaseConnection;
+import org.postgresql.core.ServerVersion;
+import org.postgresql.replication.PGReplicationStream;
+import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
+import org.postgresql.util.PSQLException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.toIntExact;
+
+/**
+ * Copied from Flink Cdc 3.5.0
+ *
+ * <p>Line 192~199: add publish_via_partition_root for partition table.
+ */
+public class PostgresReplicationConnection extends JdbcConnection implements 
ReplicationConnection {
+
+    private static Logger LOGGER = 
LoggerFactory.getLogger(PostgresReplicationConnection.class);
+
+    private final String slotName;
+    private final String publicationName;
+    private final RelationalTableFilters tableFilter;
+    private final PostgresConnectorConfig.AutoCreateMode 
publicationAutocreateMode;
+    private final PostgresConnectorConfig.LogicalDecoder plugin;
+    private final boolean dropSlotOnClose;
+    private final PostgresConnectorConfig connectorConfig;
+    private final Duration statusUpdateInterval;
+    private final MessageDecoder messageDecoder;
+    private final PostgresConnection jdbcConnection;
+    private final TypeRegistry typeRegistry;
+    private final Properties streamParams;
+
+    private Lsn defaultStartingPos;
+    private SlotCreationResult slotCreationInfo;
+    private boolean hasInitedSlot;
+
+    private Lsn endingPos;
+
+    /**
+     * Creates a new replication connection with the given params.
+     *
+     * @param config the JDBC configuration for the connection; may not be null
+     * @param slotName the name of the DB slot for logical replication; may 
not be null
+     * @param publicationName the name of the DB publication for logical 
replication; may not be
+     *     null
+     * @param tableFilter the tables to watch of the DB publication for 
logical replication; may not
+     *     be null
+     * @param publicationAutocreateMode the mode for publication autocreation; 
may not be null
+     * @param plugin decoder matching the server side plug-in used for 
streaming changes; may not be
+     *     null
+     * @param dropSlotOnClose whether the replication slot should be dropped 
once the connection is
+     *     closed
+     * @param statusUpdateInterval the interval at which the replication 
connection should
+     *     periodically send status
+     * @param doSnapshot whether the connector is doing snapshot
+     * @param jdbcConnection general PostgreSQL JDBC connection
+     * @param typeRegistry registry with PostgreSQL types
+     * @param streamParams additional parameters to pass to the replication 
stream
+     * @param schema the schema; must not be null
+     *     <p>updates to the server
+     */
+    private PostgresReplicationConnection(
+            PostgresConnectorConfig config,
+            String slotName,
+            String publicationName,
+            RelationalTableFilters tableFilter,
+            PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode,
+            PostgresConnectorConfig.LogicalDecoder plugin,
+            boolean dropSlotOnClose,
+            boolean doSnapshot,
+            Duration statusUpdateInterval,
+            PostgresConnection jdbcConnection,
+            TypeRegistry typeRegistry,
+            Properties streamParams,
+            PostgresSchema schema) {
+        super(
+                addDefaultSettings(config.getJdbcConfig()),
+                PostgresConnection.FACTORY,
+                null,
+                null,
+                "\"",
+                "\"");
+
+        this.connectorConfig = config;
+        this.slotName = slotName;
+        this.publicationName = publicationName;
+        this.tableFilter = tableFilter;
+        this.publicationAutocreateMode = publicationAutocreateMode;
+        this.plugin = plugin;
+        this.dropSlotOnClose = dropSlotOnClose;
+        this.statusUpdateInterval = statusUpdateInterval;
+        this.messageDecoder =
+                plugin.messageDecoder(new MessageDecoderContext(config, 
schema), jdbcConnection);
+        this.jdbcConnection = jdbcConnection;
+        this.typeRegistry = typeRegistry;
+        this.streamParams = streamParams;
+        this.slotCreationInfo = null;
+        this.hasInitedSlot = false;
+    }
+
+    private static JdbcConfiguration addDefaultSettings(JdbcConfiguration 
configuration) {
+        // first copy the parent's default settings...
+        // then set some additional replication specific settings
+        return JdbcConfiguration.adapt(
+                PostgresConnection.addDefaultSettings(
+                                configuration, 
PostgresConnection.CONNECTION_STREAMING)
+                        .edit()
+                        .with("replication", "database")
+                        .with(
+                                "preferQueryMode",
+                                "simple") // replication protocol only 
supports simple query mode
+                        .build());
+    }
+
+    private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, 
InterruptedException {
+        try (PostgresConnection connection =
+                new PostgresConnection(
+                        connectorConfig.getJdbcConfig(), 
PostgresConnection.CONNECTION_SLOT_INFO)) {
+            return connection.readReplicationSlotInfo(slotName, 
plugin.getPostgresPluginName());
+        }
+    }
+
+    protected void initPublication() {
+        String createPublicationStmt;
+        String tableFilterString = null;
+        if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) {
+            LOGGER.info("Initializing PgOutput logical decoder publication");
+            try {
+                // Unless the autocommit is disabled the SELECT publication 
query will stay running
+                Connection conn = pgConnection();
+                conn.setAutoCommit(false);
+
+                String selectPublication =
+                        String.format(
+                                "SELECT COUNT(1) FROM pg_publication WHERE 
pubname = '%s'",
+                                publicationName);
+                try (Statement stmt = conn.createStatement();
+                        ResultSet rs = stmt.executeQuery(selectPublication)) {
+                    if (rs.next()) {
+                        Long count = rs.getLong(1);
+                        // Close eagerly as the transaction might stay running
+                        if (count == 0L) {
+                            LOGGER.info(
+                                    "Creating new publication '{}' for plugin 
'{}'",
+                                    publicationName,
+                                    plugin);
+                            switch (publicationAutocreateMode) {
+                                case DISABLED:
+                                    throw new ConnectException(
+                                            "Publication autocreation is 
disabled, please create one and restart the connector.");
+                                case ALL_TABLES:
+                                    boolean supportPartitionRoot = 
((BaseConnection) conn).haveMinimumServerVersion(ServerVersion.v13);
+                                    createPublicationStmt = 
supportPartitionRoot
+                                            ? String.format(
+                                                    "CREATE PUBLICATION %s FOR 
ALL TABLES WITH (publish_via_partition_root = true);",
+                                                    publicationName)
+                                            : String.format(
+                                                    "CREATE PUBLICATION %s FOR 
ALL TABLES;",
+                                                    publicationName);
+                                    LOGGER.info(
+                                            "Creating Publication with 
statement '{}'",
+                                            createPublicationStmt);
+                                    // Publication doesn't exist, create it.
+                                    stmt.execute(createPublicationStmt);
+                                    break;
+                                case FILTERED:
+                                    createOrUpdatePublicationModeFilterted(
+                                            tableFilterString, stmt, false);
+                                    break;
+                            }
+                        } else {
+                            switch (publicationAutocreateMode) {
+                                case FILTERED:
+                                    createOrUpdatePublicationModeFilterted(
+                                            tableFilterString, stmt, true);
+                                    break;
+                                default:
+                                    LOGGER.trace(
+                                            "A logical publication named '{}' 
for plugin '{}' and database '{}' is already active on the server "
+                                                    + "and will be used by the 
plugin",
+                                            publicationName,
+                                            plugin,
+                                            database());
+                            }
+                        }
+                    }
+                }
+                conn.commit();
+                conn.setAutoCommit(true);
+            } catch (SQLException e) {
+                throw new JdbcConnectionException(e);
+            }
+        }
+    }
+
+    private void createOrUpdatePublicationModeFilterted(

Review Comment:
   The method name contains a spelling error: "Filterred" should be "Filtered". 
This typo appears in the method name createOrUpdatePublicationModeFilterted and 
should be corrected to createOrUpdatePublicationModeFiltered for better code 
readability and maintainability.
   ```suggestion
       private void createOrUpdatePublicationModeFiltered(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to