This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 5a243f57f4c branch-4.0: [Improve](streaming job) support postgres 
partition table sync #60560 (#60614)
5a243f57f4c is described below

commit 5a243f57f4c3edd948c63ff4f9ac46e900fb4a9b
Author: wudi <[email protected]>
AuthorDate: Tue Feb 24 13:30:38 2026 +0800

    branch-4.0: [Improve](streaming job) support postgres partition table sync 
#60560 (#60614)
    
    ### What problem does this PR solve?
    
    Cherry-picked from https://github.com/apache/doris/pull/60560, #60624
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 .../insert/streaming/StreamingMultiTblTask.java    |   4 +-
 .../connection/PostgresReplicationConnection.java  | 932 +++++++++++++++++++++
 .../reader/postgres/PostgresSourceReader.java      |  12 +-
 .../cdc/test_streaming_postgres_job_partition.out  |  10 +
 .../cdc/test_streaming_postgres_job.groovy         |   8 +-
 .../test_streaming_postgres_job_partition.groovy   | 178 ++++
 .../cdc/test_streaming_postgres_job_priv.groovy    |   4 +-
 7 files changed, 1140 insertions(+), 8 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 0bdd6262864..b6a4e8c939b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -113,7 +113,7 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
     @Override
     public void run() throws JobException {
         if (getIsCanceled().get()) {
-            log.info("task has been canceled, task id is {}", getTaskId());
+            log.info("streaming task has been canceled, task id is {}", 
getTaskId());
             return;
         }
         sendWriteRequest();
@@ -348,7 +348,7 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
                 log.warn("Failed to get task timeout reason, response: {}", 
response);
             }
         } catch (ExecutionException | InterruptedException ex) {
-            log.error("Send get fail reason request failed: ", ex);
+            log.error("Send get task fail reason request failed: ", ex);
         }
         return "";
     }
diff --git 
a/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
 
b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
new file mode 100644
index 00000000000..ac372bbb8cb
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
@@ -0,0 +1,932 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at 
http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.postgresql.connection;
+
+import io.debezium.DebeziumException;
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.connector.postgresql.TypeRegistry;
+import io.debezium.connector.postgresql.spi.SlotCreationResult;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.jdbc.JdbcConnectionException;
+import io.debezium.relational.RelationalTableFilters;
+import io.debezium.relational.TableId;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.postgresql.core.BaseConnection;
+import org.postgresql.core.ServerVersion;
+import org.postgresql.replication.PGReplicationStream;
+import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
+import org.postgresql.util.PSQLException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.toIntExact;
+
+/**
+ * Copied from Flink Cdc 3.5.0
+ *
+ * <p>Line 192~199: add publish_via_partition_root for partition table.
+ */
+public class PostgresReplicationConnection extends JdbcConnection implements 
ReplicationConnection {
+
+    private static Logger LOGGER = 
LoggerFactory.getLogger(PostgresReplicationConnection.class);
+
+    private final String slotName;
+    private final String publicationName;
+    private final RelationalTableFilters tableFilter;
+    private final PostgresConnectorConfig.AutoCreateMode 
publicationAutocreateMode;
+    private final PostgresConnectorConfig.LogicalDecoder plugin;
+    private final boolean dropSlotOnClose;
+    private final PostgresConnectorConfig connectorConfig;
+    private final Duration statusUpdateInterval;
+    private final MessageDecoder messageDecoder;
+    private final PostgresConnection jdbcConnection;
+    private final TypeRegistry typeRegistry;
+    private final Properties streamParams;
+
+    private Lsn defaultStartingPos;
+    private SlotCreationResult slotCreationInfo;
+    private boolean hasInitedSlot;
+
+    private Lsn endingPos;
+
+    /**
+     * Creates a new replication connection with the given params.
+     *
+     * @param config the JDBC configuration for the connection; may not be null
+     * @param slotName the name of the DB slot for logical replication; may 
not be null
+     * @param publicationName the name of the DB publication for logical 
replication; may not be
+     *     null
+     * @param tableFilter the tables to watch of the DB publication for 
logical replication; may not
+     *     be null
+     * @param publicationAutocreateMode the mode for publication autocreation; 
may not be null
+     * @param plugin decoder matching the server side plug-in used for 
streaming changes; may not be
+     *     null
+     * @param dropSlotOnClose whether the replication slot should be dropped 
once the connection is
+     *     closed
+     * @param statusUpdateInterval the interval at which the replication 
connection should
+     *     periodically send status
+     * @param doSnapshot whether the connector is doing snapshot
+     * @param jdbcConnection general PostgreSQL JDBC connection
+     * @param typeRegistry registry with PostgreSQL types
+     * @param streamParams additional parameters to pass to the replication 
stream
+     * @param schema the schema; must not be null
+     *     <p>updates to the server
+     */
+    private PostgresReplicationConnection(
+            PostgresConnectorConfig config,
+            String slotName,
+            String publicationName,
+            RelationalTableFilters tableFilter,
+            PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode,
+            PostgresConnectorConfig.LogicalDecoder plugin,
+            boolean dropSlotOnClose,
+            boolean doSnapshot,
+            Duration statusUpdateInterval,
+            PostgresConnection jdbcConnection,
+            TypeRegistry typeRegistry,
+            Properties streamParams,
+            PostgresSchema schema) {
+        super(
+                addDefaultSettings(config.getJdbcConfig()),
+                PostgresConnection.FACTORY,
+                null,
+                null,
+                "\"",
+                "\"");
+
+        this.connectorConfig = config;
+        this.slotName = slotName;
+        this.publicationName = publicationName;
+        this.tableFilter = tableFilter;
+        this.publicationAutocreateMode = publicationAutocreateMode;
+        this.plugin = plugin;
+        this.dropSlotOnClose = dropSlotOnClose;
+        this.statusUpdateInterval = statusUpdateInterval;
+        this.messageDecoder =
+                plugin.messageDecoder(new MessageDecoderContext(config, 
schema), jdbcConnection);
+        this.jdbcConnection = jdbcConnection;
+        this.typeRegistry = typeRegistry;
+        this.streamParams = streamParams;
+        this.slotCreationInfo = null;
+        this.hasInitedSlot = false;
+    }
+
+    private static JdbcConfiguration addDefaultSettings(JdbcConfiguration 
configuration) {
+        // first copy the parent's default settings...
+        // then set some additional replication specific settings
+        return JdbcConfiguration.adapt(
+                PostgresConnection.addDefaultSettings(
+                                configuration, 
PostgresConnection.CONNECTION_STREAMING)
+                        .edit()
+                        .with("replication", "database")
+                        .with(
+                                "preferQueryMode",
+                                "simple") // replication protocol only 
supports simple query mode
+                        .build());
+    }
+
+    private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, 
InterruptedException {
+        try (PostgresConnection connection =
+                new PostgresConnection(
+                        connectorConfig.getJdbcConfig(), 
PostgresConnection.CONNECTION_SLOT_INFO)) {
+            return connection.readReplicationSlotInfo(slotName, 
plugin.getPostgresPluginName());
+        }
+    }
+
+    protected void initPublication() {
+        String createPublicationStmt;
+        String tableFilterString = null;
+        if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) {
+            LOGGER.info("Initializing PgOutput logical decoder publication");
+            try {
+                // Unless the autocommit is disabled the SELECT publication 
query will stay running
+                Connection conn = pgConnection();
+                conn.setAutoCommit(false);
+
+                String selectPublication =
+                        String.format(
+                                "SELECT COUNT(1) FROM pg_publication WHERE 
pubname = '%s'",
+                                publicationName);
+                try (Statement stmt = conn.createStatement();
+                        ResultSet rs = stmt.executeQuery(selectPublication)) {
+                    if (rs.next()) {
+                        Long count = rs.getLong(1);
+                        // Close eagerly as the transaction might stay running
+                        if (count == 0L) {
+                            LOGGER.info(
+                                    "Creating new publication '{}' for plugin 
'{}'",
+                                    publicationName,
+                                    plugin);
+                            switch (publicationAutocreateMode) {
+                                case DISABLED:
+                                    throw new ConnectException(
+                                            "Publication autocreation is 
disabled, please create one and restart the connector.");
+                                case ALL_TABLES:
+                                    boolean supportPartitionRoot = 
((BaseConnection) conn).haveMinimumServerVersion(ServerVersion.v13);
+                                    createPublicationStmt = 
supportPartitionRoot
+                                            ? String.format(
+                                                    "CREATE PUBLICATION %s FOR 
ALL TABLES WITH (publish_via_partition_root = true);",
+                                                    publicationName)
+                                            : String.format(
+                                                    "CREATE PUBLICATION %s FOR 
ALL TABLES;",
+                                                    publicationName);
+                                    LOGGER.info(
+                                            "Creating Publication with 
statement '{}'",
+                                            createPublicationStmt);
+                                    // Publication doesn't exist, create it.
+                                    stmt.execute(createPublicationStmt);
+                                    break;
+                                case FILTERED:
+                                    createOrUpdatePublicationModeFilterted(
+                                            tableFilterString, stmt, false);
+                                    break;
+                            }
+                        } else {
+                            switch (publicationAutocreateMode) {
+                                case FILTERED:
+                                    createOrUpdatePublicationModeFilterted(
+                                            tableFilterString, stmt, true);
+                                    break;
+                                default:
+                                    LOGGER.trace(
+                                            "A logical publication named '{}' 
for plugin '{}' and database '{}' is already active on the server "
+                                                    + "and will be used by the 
plugin",
+                                            publicationName,
+                                            plugin,
+                                            database());
+                            }
+                        }
+                    }
+                }
+                conn.commit();
+                conn.setAutoCommit(true);
+            } catch (SQLException e) {
+                throw new JdbcConnectionException(e);
+            }
+        }
+    }
+
+    private void createOrUpdatePublicationModeFilterted(
+            String tableFilterString, Statement stmt, boolean isUpdate) {
+        String createOrUpdatePublicationStmt;
+        try {
+            Set<TableId> tablesToCapture = determineCapturedTables();
+            tableFilterString =
+                    tablesToCapture.stream()
+                            .map(TableId::toDoubleQuotedString)
+                            .collect(Collectors.joining(", "));
+            if (tableFilterString.isEmpty()) {
+                throw new DebeziumException(
+                        String.format(
+                                "No table filters found for filtered 
publication %s",
+                                publicationName));
+            }
+            createOrUpdatePublicationStmt =
+                    isUpdate
+                            ? String.format(
+                            "ALTER PUBLICATION %s SET TABLE %s;",
+                            publicationName, tableFilterString)
+                            : String.format(
+                                    "CREATE PUBLICATION %s FOR TABLE %s;",
+                                    publicationName, tableFilterString);
+            LOGGER.info(
+                    isUpdate
+                            ? "Updating Publication with statement '{}'"
+                            : "Creating Publication with statement '{}'",
+                    createOrUpdatePublicationStmt);
+            stmt.execute(createOrUpdatePublicationStmt);
+        } catch (Exception e) {
+            throw new ConnectException(
+                    String.format(
+                            "Unable to %s filtered publication %s for %s",
+                            isUpdate ? "update" : "create", publicationName, 
tableFilterString),
+                    e);
+        }
+    }
+
+    private Set<TableId> determineCapturedTables() throws Exception {
+        Set<TableId> allTableIds = 
jdbcConnection.getAllTableIds(connectorConfig.databaseName());
+
+        Set<TableId> capturedTables = new HashSet<>();
+
+        for (TableId tableId : allTableIds) {
+            if (tableFilter.dataCollectionFilter().isIncluded(tableId)) {
+                LOGGER.trace("Adding table {} to the list of captured tables", 
tableId);
+                capturedTables.add(tableId);
+            } else {
+                LOGGER.trace(
+                        "Ignoring table {} as it's not included in the filter 
configuration",
+                        tableId);
+            }
+        }
+
+        return capturedTables.stream()
+                .sorted()
+                .collect(Collectors.toCollection(LinkedHashSet::new));
+    }
+
+    protected void initReplicationSlot() throws SQLException, 
InterruptedException {
+        ServerInfo.ReplicationSlot slotInfo = getSlotInfo();
+
+        boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == 
slotInfo;
+        try {
+            // there's no info for this plugin and slot so create a new slot
+            if (shouldCreateSlot) {
+                this.createReplicationSlot();
+            }
+
+            // replication connection does not support parsing of SQL 
statements so we need to
+            // create
+            // the connection without executing on connect statements - see 
JDBC opt
+            // preferQueryMode=simple
+            pgConnection();
+            final String identifySystemStatement = "IDENTIFY_SYSTEM";
+            LOGGER.debug(
+                    "running '{}' to validate replication connection", 
identifySystemStatement);
+            final Lsn xlogStart =
+                    queryAndMap(
+                            identifySystemStatement,
+                            rs -> {
+                                if (!rs.next()) {
+                                    throw new IllegalStateException(
+                                            "The DB connection is not a valid 
replication connection");
+                                }
+                                String xlogpos = rs.getString("xlogpos");
+                                LOGGER.debug("received latest xlogpos '{}'", 
xlogpos);
+                                return Lsn.valueOf(xlogpos);
+                            });
+
+            if (slotCreationInfo != null) {
+                this.defaultStartingPos = slotCreationInfo.startLsn();
+            } else if (shouldCreateSlot || !slotInfo.hasValidFlushedLsn()) {
+                // this is a new slot or we weren't able to read a valid flush 
LSN pos, so we always
+                // start from the xlog pos that was reported
+                this.defaultStartingPos = xlogStart;
+            } else {
+                Lsn latestFlushedLsn = slotInfo.latestFlushedLsn();
+                this.defaultStartingPos =
+                        latestFlushedLsn.compareTo(xlogStart) < 0 ? 
latestFlushedLsn : xlogStart;
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("found previous flushed LSN '{}'", 
latestFlushedLsn);
+                }
+            }
+            hasInitedSlot = true;
+        } catch (SQLException e) {
+            throw new JdbcConnectionException(e);
+        }
+    }
+
+    // Temporary replication slots is a new feature of PostgreSQL 10
+    private boolean useTemporarySlot() throws SQLException {
+        // Temporary replication slots cannot be used due to connection restart
+        // when finding WAL position
+        // return dropSlotOnClose && 
pgConnection().haveMinimumServerVersion(ServerVersion.v10);
+        return false;
+    }
+
+    /**
+     * creating a replication connection and starting to stream involves a few 
steps: 1. we create
+     * the connection and ensure that a. the slot exists b. the slot isn't 
currently being used 2.
+     * we query to get our potential start position in the slot (lsn) 3. we 
try and start streaming,
+     * depending on our options (such as in wal2json) this may fail, which can 
result in the
+     * connection being killed and we need to start the process over if we are 
using a temporary
+     * slot 4. actually start the streamer
+     *
+     * <p>This method takes care of all of these and this method queries for a 
default starting
+     * position If you know where you are starting from you should call {@link 
#startStreaming(Lsn,
+     * WalPositionLocator)}, this method delegates to that method
+     *
+     * @return
+     * @throws SQLException
+     * @throws InterruptedException
+     */
+    @Override
+    public ReplicationStream startStreaming(WalPositionLocator walPosition)
+            throws SQLException, InterruptedException {
+        return startStreaming(null, walPosition);
+    }
+
+    @Override
+    public ReplicationStream startStreaming(Lsn offset, WalPositionLocator 
walPosition)
+            throws SQLException, InterruptedException {
+        initConnection();
+
+        connect();
+        if (offset == null || !offset.isValid()) {
+            offset = defaultStartingPos;
+        }
+        Lsn lsn = offset;
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("starting streaming from LSN '{}'", lsn);
+        }
+
+        final int maxRetries = connectorConfig.maxRetries();
+        final Duration delay = connectorConfig.retryDelay();
+        int tryCount = 0;
+        while (true) {
+            try {
+                return createReplicationStream(lsn, walPosition);
+            } catch (Exception e) {
+                String message = "Failed to start replication stream at " + 
lsn;
+                if (++tryCount > maxRetries) {
+                    if (e.getMessage().matches(".*replication slot .* is 
active.*")) {
+                        message +=
+                                "; when setting up multiple connectors for the 
same database host, please make sure to use a distinct replication slot name 
for each.";
+                    }
+                    throw new DebeziumException(message, e);
+                } else {
+                    LOGGER.warn(
+                            message + ", waiting for {} ms and retrying, 
attempt number {} over {}",
+                            delay,
+                            tryCount,
+                            maxRetries);
+                    final Metronome metronome = Metronome.sleeper(delay, 
Clock.SYSTEM);
+                    metronome.pause();
+                }
+            }
+        }
+    }
+
+    @Override
+    public void initConnection() throws SQLException, InterruptedException {
+        // See 
https://www.postgresql.org/docs/current/logical-replication-quick-setup.html
+        // For pgoutput specifically, the publication must be created before 
the slot.
+        initPublication();
+        if (!hasInitedSlot) {
+            initReplicationSlot();
+        }
+    }
+
+    @Override
+    public Optional<SlotCreationResult> createReplicationSlot() throws 
SQLException {
+        // note that some of these options are only supported in Postgres 
9.4+, additionally
+        // the options are not yet exported by the jdbc api wrapper, 
therefore, we just do
+        // this ourselves but eventually this should be moved back to the jdbc 
API
+        // see https://github.com/pgjdbc/pgjdbc/issues/1305
+
+        LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", 
slotName, plugin);
+        String tempPart = "";
+        // Exported snapshots are supported in Postgres 9.4+
+        boolean canExportSnapshot = 
pgConnection().haveMinimumServerVersion(ServerVersion.v9_4);
+        if ((dropSlotOnClose) && !canExportSnapshot) {
+            LOGGER.warn(
+                    "A slot marked as temporary or with an exported snapshot 
was created, "
+                            + "but not on a supported version of Postgres, 
ignoring!");
+        }
+        if (useTemporarySlot()) {
+            tempPart = "TEMPORARY";
+        }
+
+        // See 
https://www.postgresql.org/docs/current/logical-replication-quick-setup.html
+        // For pgoutput specifically, the publication must be created prior to 
the slot.
+        initPublication();
+
+        try (Statement stmt = pgConnection().createStatement()) {
+            String createCommand =
+                    String.format(
+                            "CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s",
+                            slotName, tempPart, 
plugin.getPostgresPluginName());
+            LOGGER.info("Creating replication slot with command {}", 
createCommand);
+            stmt.execute(createCommand);
+            // when we are in Postgres 9.4+, we can parse the slot creation 
info,
+            // otherwise, it returns nothing
+            if (canExportSnapshot) {
+                this.slotCreationInfo = parseSlotCreation(stmt.getResultSet());
+            }
+
+            return Optional.ofNullable(slotCreationInfo);
+        }
+    }
+
+    protected BaseConnection pgConnection() throws SQLException {
+        return (BaseConnection) connection(false);
+    }
+
+    private SlotCreationResult parseSlotCreation(ResultSet rs) {
+        try {
+            if (rs.next()) {
+                String slotName = rs.getString("slot_name");
+                String startPoint = rs.getString("consistent_point");
+                String snapName = rs.getString("snapshot_name");
+                String pluginName = rs.getString("output_plugin");
+
+                return new SlotCreationResult(slotName, startPoint, snapName, 
pluginName);
+            } else {
+                throw new ConnectException("No replication slot found");
+            }
+        } catch (SQLException ex) {
+            throw new ConnectException("Unable to parse 
create_replication_slot response", ex);
+        }
+    }
+
+    private ReplicationStream createReplicationStream(
+            final Lsn startLsn, WalPositionLocator walPosition)
+            throws SQLException, InterruptedException {
+        PGReplicationStream s;
+
+        try {
+            try {
+                s =
+                        startPgReplicationStream(
+                                startLsn,
+                                plugin.forceRds()
+                                        ? 
messageDecoder::optionsWithoutMetadata
+                                        : messageDecoder::optionsWithMetadata);
+                messageDecoder.setContainsMetadata(plugin.forceRds() ? false : 
true);
+            } catch (PSQLException e) {
+                LOGGER.debug(
+                        "Could not register for streaming, retrying without 
optional options", e);
+
+                // re-init the slot after a failed start of slot, as this
+                // may have closed the slot
+                if (useTemporarySlot()) {
+                    initReplicationSlot();
+                }
+
+                s =
+                        startPgReplicationStream(
+                                startLsn,
+                                plugin.forceRds()
+                                        ? 
messageDecoder::optionsWithoutMetadata
+                                        : messageDecoder::optionsWithMetadata);
+                messageDecoder.setContainsMetadata(plugin.forceRds() ? false : 
true);
+            }
+        } catch (PSQLException e) {
+            if (e.getMessage().matches("(?s)ERROR: option .* is unknown.*")) {
+                // It is possible we are connecting to an old wal2json plug-in
+                LOGGER.warn(
+                        "Could not register for streaming with metadata in 
messages, falling back to messages without metadata");
+
+                // re-init the slot after a failed start of slot, as this
+                // may have closed the slot
+                if (useTemporarySlot()) {
+                    initReplicationSlot();
+                }
+
+                s = startPgReplicationStream(startLsn, 
messageDecoder::optionsWithoutMetadata);
+                messageDecoder.setContainsMetadata(false);
+            } else if (e.getMessage()
+                    .matches("(?s)ERROR: requested WAL segment .* has already 
been removed.*")) {
+                LOGGER.error("Cannot rewind to last processed WAL position", 
e);
+                throw new ConnectException(
+                        "The offset to start reading from has been removed 
from the database write-ahead log. Create a new snapshot and consider setting 
of PostgreSQL parameter wal_keep_segments = 0.");
+            } else {
+                throw e;
+            }
+        }
+
+        final PGReplicationStream stream = s;
+
+        return new ReplicationStream() {
+
+            private static final int CHECK_WARNINGS_AFTER_COUNT = 100;
+            private int warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
+            private ExecutorService keepAliveExecutor = null;
+            private AtomicBoolean keepAliveRunning;
+            private final Metronome metronome =
+                    Metronome.sleeper(statusUpdateInterval, Clock.SYSTEM);
+
+            // make sure this is volatile since multiple threads may be 
interested in this value
+            private volatile Lsn lastReceivedLsn;
+
+            @Override
+            public void read(ReplicationMessageProcessor processor)
+                    throws SQLException, InterruptedException {
+                processWarnings(false);
+                ByteBuffer read = stream.read();
+                final Lsn lastReceiveLsn = 
Lsn.valueOf(stream.getLastReceiveLSN());
+                LOGGER.trace(
+                        "Streaming requested from LSN {}, received LSN {}",
+                        startLsn,
+                        lastReceiveLsn);
+                if (reachEnd(lastReceivedLsn)) {
+                    lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
+                    LOGGER.trace("Received message at LSN {}", 
lastReceivedLsn);
+                    processor.process(new ReplicationMessage.NoopMessage(null, 
null));
+                    return;
+                }
+                if (messageDecoder.shouldMessageBeSkipped(
+                        read, lastReceiveLsn, startLsn, walPosition)) {
+                    return;
+                }
+                deserializeMessages(read, processor);
+            }
+
+            @Override
+            public boolean readPending(ReplicationMessageProcessor processor)
+                    throws SQLException, InterruptedException {
+                processWarnings(false);
+                ByteBuffer read = stream.readPending();
+                final Lsn lastReceiveLsn = 
Lsn.valueOf(stream.getLastReceiveLSN());
+                LOGGER.trace(
+                        "Streaming requested from LSN {}, received LSN {}",
+                        startLsn,
+                        lastReceiveLsn);
+
+                if (reachEnd(lastReceiveLsn)) {
+                    lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
+                    LOGGER.trace("Received message at LSN {}", 
lastReceivedLsn);
+                    processor.process(new ReplicationMessage.NoopMessage(null, 
null));
+                    return true;
+                }
+
+                if (read == null) {
+                    return false;
+                }
+
+                if (messageDecoder.shouldMessageBeSkipped(
+                        read, lastReceiveLsn, startLsn, walPosition)) {
+                    return true;
+                }
+
+                deserializeMessages(read, processor);
+
+                return true;
+            }
+
+            private void deserializeMessages(
+                    ByteBuffer buffer, ReplicationMessageProcessor processor)
+                    throws SQLException, InterruptedException {
+                lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
+                LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
+                messageDecoder.processMessage(buffer, processor, typeRegistry);
+            }
+
+            @Override
+            public void close() throws SQLException {
+                processWarnings(true);
+                stream.close();
+            }
+
+            @Override
+            public void flushLsn(Lsn lsn) throws SQLException {
+                doFlushLsn(lsn);
+            }
+
+            private void doFlushLsn(Lsn lsn) throws SQLException {
+                stream.setFlushedLSN(lsn.asLogSequenceNumber());
+                stream.setAppliedLSN(lsn.asLogSequenceNumber());
+
+                stream.forceUpdateStatus();
+            }
+
+            @Override
+            public Lsn lastReceivedLsn() {
+                return lastReceivedLsn;
+            }
+
+            @Override
+            public void startKeepAlive(ExecutorService service) {
+                if (keepAliveExecutor == null) {
+                    keepAliveExecutor = service;
+                    keepAliveRunning = new AtomicBoolean(true);
+                    keepAliveExecutor.submit(
+                            () -> {
+                                while (keepAliveRunning.get()) {
+                                    try {
+                                        LOGGER.trace(
+                                                "Forcing status update with 
replication stream");
+                                        stream.forceUpdateStatus();
+                                        metronome.pause();
+                                    } catch (Exception exp) {
+                                        throw new RuntimeException(
+                                                "received unexpected exception 
will perform keep alive",
+                                                exp);
+                                    }
+                                }
+                            });
+                }
+            }
+
+            @Override
+            public void stopKeepAlive() {
+                if (keepAliveExecutor != null) {
+                    keepAliveRunning.set(false);
+                    keepAliveExecutor.shutdownNow();
+                    keepAliveExecutor = null;
+                }
+            }
+
+            private void processWarnings(final boolean forced) throws 
SQLException {
+                if (--warningCheckCounter == 0 || forced) {
+                    warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
+                    for (SQLWarning w = connection().getWarnings();
+                            w != null;
+                            w = w.getNextWarning()) {
+                        LOGGER.debug(
+                                "Server-side message: '{}', state = {}, code = 
{}",
+                                w.getMessage(),
+                                w.getSQLState(),
+                                w.getErrorCode());
+                    }
+                    connection().clearWarnings();
+                }
+            }
+
+            @Override
+            public Lsn startLsn() {
+                return startLsn;
+            }
+
+            private boolean reachEnd(Lsn receivedLsn) {
+                if (receivedLsn == null) {
+                    return false;
+                }
+                return endingPos != null
+                        && (!endingPos.isNonStopping())
+                        && endingPos.compareTo(receivedLsn) < 0;
+            }
+        };
+    }
+
+    public void setEndingPos(Lsn endingPos) {
+        this.endingPos = endingPos;
+    }
+
+    private PGReplicationStream startPgReplicationStream(
+            final Lsn lsn,
+            BiFunction<
+                    ChainedLogicalStreamBuilder,
+                    Function<Integer, Boolean>,
+                    ChainedLogicalStreamBuilder>
+                    configurator)
+            throws SQLException {
+        assert lsn != null;
+        ChainedLogicalStreamBuilder streamBuilder =
+                pgConnection()
+                        .getReplicationAPI()
+                        .replicationStream()
+                        .logical()
+                        .withSlotName("\"" + slotName + "\"")
+                        .withStartPosition(lsn.asLogSequenceNumber())
+                        .withSlotOptions(streamParams);
+        streamBuilder = configurator.apply(streamBuilder, 
this::hasMinimumVersion);
+
+        if (statusUpdateInterval != null && statusUpdateInterval.toMillis() > 
0) {
+            streamBuilder.withStatusInterval(
+                    toIntExact(statusUpdateInterval.toMillis()), 
TimeUnit.MILLISECONDS);
+        }
+
+        PGReplicationStream stream = streamBuilder.start();
+
+        // TODO DBZ-508 get rid of this
+        // Needed by tests when connections are opened and closed in a fast 
sequence
+        try {
+            Thread.sleep(10);
+        } catch (Exception e) {
+        }
+        stream.forceUpdateStatus();
+        return stream;
+    }
+
+    private Boolean hasMinimumVersion(int version) {
+        try {
+            return pgConnection().haveMinimumServerVersion(version);
+        } catch (SQLException e) {
+            throw new DebeziumException(e);
+        }
+    }
+
+    @Override
+    public synchronized void close() {
+        close(true);
+    }
+
+    public synchronized void close(boolean dropSlot) {
+        try {
+            LOGGER.debug("Closing message decoder");
+            messageDecoder.close();
+        } catch (Throwable e) {
+            LOGGER.error("Unexpected error while closing message decoder", e);
+        }
+
+        try {
+            LOGGER.debug("Closing replication connection");
+            super.close();
+        } catch (Throwable e) {
+            LOGGER.error("Unexpected error while closing Postgres connection", 
e);
+        }
+        if (dropSlotOnClose && dropSlot) {
+            // we're dropping the replication slot via a regular - i.e. not a 
replication -
+            // connection
+            try (PostgresConnection connection =
+                    new PostgresConnection(
+                            connectorConfig.getJdbcConfig(),
+                            PostgresConnection.CONNECTION_DROP_SLOT)) {
+                connection.dropReplicationSlot(slotName);
+            } catch (Throwable e) {
+                LOGGER.error("Unexpected error while dropping replication 
slot", e);
+            }
+        }
+    }
+
+    @Override
+    public void reconnect() throws SQLException {
+        close(false);
+        // Don't re-execute initial commands on reconnection
+        connection(false);
+    }
+
+    protected static class ReplicationConnectionBuilder implements Builder {
+
+        private final PostgresConnectorConfig config;
+        private String slotName = DEFAULT_SLOT_NAME;
+        private String publicationName = DEFAULT_PUBLICATION_NAME;
+        private RelationalTableFilters tableFilter;
+        private PostgresConnectorConfig.AutoCreateMode 
publicationAutocreateMode =
+                PostgresConnectorConfig.AutoCreateMode.ALL_TABLES;
+        private PostgresConnectorConfig.LogicalDecoder plugin =
+                PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
+        private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE;
+        private Duration statusUpdateIntervalVal;
+        private boolean doSnapshot;
+        private TypeRegistry typeRegistry;
+        private PostgresSchema schema;
+        private Properties slotStreamParams = new Properties();
+        private PostgresConnection jdbcConnection;
+
+        protected ReplicationConnectionBuilder(PostgresConnectorConfig config) 
{
+            assert config != null;
+            this.config = config;
+        }
+
+        @Override
+        public ReplicationConnectionBuilder withSlot(final String slotName) {
+            assert slotName != null;
+            this.slotName = slotName;
+            return this;
+        }
+
+        @Override
+        public Builder withPublication(String publicationName) {
+            assert publicationName != null;
+            this.publicationName = publicationName;
+            return this;
+        }
+
+        @Override
+        public Builder withTableFilter(RelationalTableFilters tableFilter) {
+            assert tableFilter != null;
+            this.tableFilter = tableFilter;
+            return this;
+        }
+
+        @Override
+        public Builder withPublicationAutocreateMode(
+                PostgresConnectorConfig.AutoCreateMode 
publicationAutocreateMode) {
+            assert publicationName != null;
+            this.publicationAutocreateMode = publicationAutocreateMode;
+            return this;
+        }
+
+        @Override
+        public ReplicationConnectionBuilder withPlugin(
+                final PostgresConnectorConfig.LogicalDecoder plugin) {
+            assert plugin != null;
+            this.plugin = plugin;
+            return this;
+        }
+
+        @Override
+        public ReplicationConnectionBuilder dropSlotOnClose(final boolean 
dropSlotOnClose) {
+            this.dropSlotOnClose = dropSlotOnClose;
+            return this;
+        }
+
+        @Override
+        public ReplicationConnectionBuilder streamParams(final String 
slotStreamParams) {
+            if (slotStreamParams != null && !slotStreamParams.isEmpty()) {
+                this.slotStreamParams = new Properties();
+                String[] paramsWithValues = slotStreamParams.split(";");
+                for (String paramsWithValue : paramsWithValues) {
+                    String[] paramAndValue = paramsWithValue.split("=");
+                    if (paramAndValue.length == 2) {
+                        this.slotStreamParams.setProperty(paramAndValue[0], 
paramAndValue[1]);
+                    } else {
+                        LOGGER.warn(
+                                "The following STREAM_PARAMS value is invalid: 
{}",
+                                paramsWithValue);
+                    }
+                }
+            }
+            return this;
+        }
+
+        @Override
+        public ReplicationConnectionBuilder statusUpdateInterval(
+                final Duration statusUpdateInterval) {
+            this.statusUpdateIntervalVal = statusUpdateInterval;
+            return this;
+        }
+
+        @Override
+        public Builder doSnapshot(boolean doSnapshot) {
+            this.doSnapshot = doSnapshot;
+            return this;
+        }
+
+        @Override
+        public Builder jdbcMetadataConnection(PostgresConnection 
jdbcConnection) {
+            this.jdbcConnection = jdbcConnection;
+            return this;
+        }
+
+        @Override
+        public ReplicationConnection build() {
+            assert plugin != null : "Decoding plugin name is not set";
+            return new PostgresReplicationConnection(
+                    config,
+                    slotName,
+                    publicationName,
+                    tableFilter,
+                    publicationAutocreateMode,
+                    plugin,
+                    dropSlotOnClose,
+                    doSnapshot,
+                    statusUpdateIntervalVal,
+                    jdbcConnection,
+                    typeRegistry,
+                    slotStreamParams,
+                    schema);
+        }
+
+        @Override
+        public Builder withTypeRegistry(TypeRegistry typeRegistry) {
+            this.typeRegistry = typeRegistry;
+            return this;
+        }
+
+        @Override
+        public Builder withSchema(PostgresSchema schema) {
+            this.schema = schema;
+            return this;
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index c7effe7a967..5a9fa095941 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -158,16 +158,19 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
 
         String hostname = props.getProperty("PGHOST");
         String port = props.getProperty("PGPORT");
-        String database = props.getProperty("PGDBNAME");
+        String databaseFromUrl = props.getProperty("PGDBNAME");
         Preconditions.checkNotNull(hostname, "host is required");
         Preconditions.checkNotNull(port, "port is required");
-        Preconditions.checkNotNull(database, "database is required");
 
         configFactory.hostname(hostname);
         configFactory.port(Integer.parseInt(port));
         configFactory.username(cdcConfig.get(DataSourceConfigKeys.USER));
         configFactory.password(cdcConfig.get(DataSourceConfigKeys.PASSWORD));
-        configFactory.database(database);
+
+        String database = cdcConfig.get(DataSourceConfigKeys.DATABASE);
+        String finalDatabase = StringUtils.isNotEmpty(database) ? database : 
databaseFromUrl;
+        Preconditions.checkNotNull(finalDatabase, "database is required");
+        configFactory.database(finalDatabase);
 
         String schema = cdcConfig.get(DataSourceConfigKeys.SCHEMA);
         Preconditions.checkNotNull(schema, "schema is required");
@@ -219,6 +222,9 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         configFactory.heartbeatInterval(
                 Duration.ofMillis(Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS));
 
+        // support scan partition table
+        configFactory.setIncludePartitionedTables(true);
+
         // subtaskId use pg create slot in snapshot phase, slotname is 
slot_name_subtaskId
         return configFactory.create(subtaskId);
     }
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.out
new file mode 100644
index 00000000000..331d1fe2040
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_orders_partition_snapshot --
+1      1001    2024-01-10
+2      1002    2024-02-05
+
+-- !select_orders_partition_binlog_all --
+2      2002    2024-02-05
+3      1003    2024-01-20
+4      1004    2024-03-15
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
index ba2c1247016..59be0f77d7a 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
@@ -135,11 +135,13 @@ suite("test_streaming_postgres_job", 
"p0,external,pg,external_docker,external_do
         // mock incremental into
         connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
             sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name,age) VALUES 
('Doris',18);"""
+            def xminResult = sql """SELECT xmin, xmax , * FROM 
${pgSchema}.${table1} WHERE name = 'Doris'; """
+            log.info("xminResult: " + xminResult)
             sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET age = 10 WHERE 
name = 'B1';"""
             sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE name = 
'A1';"""
         }
 
-        sleep(30000); // wait for cdc incremental data
+        sleep(60000); // wait for cdc incremental data
 
         // check incremental data
         qt_select_binlog_table1 """ SELECT * FROM ${table1} order by name asc 
"""
@@ -156,9 +158,11 @@ suite("test_streaming_postgres_job", 
"p0,external,pg,external_docker,external_do
         // mock incremental into again
         connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
             sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name,age) VALUES 
('Apache',40);"""
+            def xminResult1 = sql """SELECT xmin, xmax , * FROM 
${pgSchema}.${table1} WHERE name = 'Apache'; """
+            log.info("xminResult1: " + xminResult1)
         }
 
-        sleep(30000); // wait for cdc incremental data
+        sleep(60000); // wait for cdc incremental data
 
         // check incremental data
         qt_select_next_binlog_table1 """ SELECT * FROM ${table1} order by name 
asc """
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy
new file mode 100644
index 00000000000..e68b0302d51
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_partition", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_postgres_job_partition_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "user_info_pg_orders" 
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // 1. create postgres partition table and insert snapshot data
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgSchema}.${table1}"""
+
+            sql """
+                CREATE TABLE ${pgSchema}.${table1} (
+                    id BIGINT,
+                    user_id BIGINT,
+                    order_date DATE,
+                    PRIMARY KEY (id, order_date)
+                ) PARTITION BY RANGE (order_date)
+            """
+
+            // create two partitions: 2024-01, 2024-02
+            sql """CREATE TABLE ${table1}_p202401 PARTITION OF 
${pgSchema}.${table1}
+                   FOR VALUES FROM ('2024-01-01') TO ('2024-02-01')"""
+            sql """CREATE TABLE ${table1}_p202402 PARTITION OF 
${pgSchema}.${table1}
+                   FOR VALUES FROM ('2024-02-01') TO ('2024-03-01')"""
+
+            // make snapshot data, insert into two partitions
+            sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date) 
VALUES (1, 1001, DATE '2024-01-10');"""
+            sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date) 
VALUES (2, 1002, DATE '2024-02-05');"""
+        }
+
+        // 2. create streaming job
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${table1}",
+                    "offset" = "initial"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        // wait snapshot data sync completed
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobSuccendCount = sql """ select SucceedTaskCount 
from jobs("type"="insert")
+                                                     where Name = '${jobName}' 
and ExecuteType='STREAMING' """
+                        log.info("jobSuccendCount: " + jobSuccendCount)
+                        jobSuccendCount.size() == 1 && '2' <= 
jobSuccendCount.get(0).get(0)
+                    }
+            )
+        } catch (Exception ex){
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex;
+        }
+
+        // 3. check snapshot data
+        qt_select_orders_partition_snapshot """
+            SELECT id, user_id, order_date
+            FROM ${table1}
+            ORDER BY id
+        """
+
+        // 4. mock insert, update, delete and create new partition
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            // insert
+            sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date)
+                   VALUES (3, 1003, DATE '2024-01-20');"""
+
+            def xminResult = sql """SELECT xmin, xmax , * FROM 
${pgSchema}.${table1} WHERE id = 3"""
+            log.info("xminResult: " + xminResult)
+
+            // update
+            sql """UPDATE ${pgSchema}.${table1}
+                   SET user_id = 2002
+                   WHERE id = 2 AND order_date = DATE '2024-02-05';"""
+
+            // delete
+            sql """DELETE FROM ${pgSchema}.${table1}
+                   WHERE id = 1 AND order_date = DATE '2024-01-10';"""
+
+            // create new partition and insert data
+            sql """CREATE TABLE ${table1}_p202403 PARTITION OF 
${pgSchema}.${table1}
+                   FOR VALUES FROM ('2024-03-01') TO ('2024-04-01')"""
+
+            sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date)
+                   VALUES (4, 1004, DATE '2024-03-15');"""
+
+            def xminResult1 = sql """SELECT xmin, xmax , * FROM 
${pgSchema}.${table1} WHERE id = 4"""
+            log.info("xminResult1: " + xminResult1)
+        }
+
+        // wait for all incremental data 
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobSuccendCount = sql """ select SucceedTaskCount 
from jobs("type"="insert")
+                                                     where Name = '${jobName}' 
and ExecuteType='STREAMING' """
+                        log.info("jobSuccendCount: " + jobSuccendCount)
+                        jobSuccendCount.size() == 1 && '3' <= 
jobSuccendCount.get(0).get(0)
+                    }
+            )
+        } catch (Exception ex){
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex;
+        }
+
+        def jobInfo = sql """
+            select loadStatistic, status from jobs("type"="insert") where 
Name='${jobName}'
+        """
+        log.info("jobInfo: " + jobInfo)
+        assert jobInfo.get(0).get(1) == "RUNNING"
+
+        // check binlog data
+        qt_select_orders_partition_binlog_all """
+            SELECT id, user_id, order_date
+            FROM ${table1}
+            ORDER BY id
+        """
+
+        sql """ DROP JOB IF EXISTS where jobname =  '${jobName}' """
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert")  
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
index 9c0cd6a464c..5747438b717 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
@@ -134,8 +134,10 @@ suite("test_streaming_postgres_job_priv", 
"p0,external,pg,external_docker,extern
         )
 
         // mock incremental into
-        connect("${newPgUser}", "${newPgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
             sql """INSERT INTO ${pgDB}.${pgSchema}.${tableName} (name,age) 
VALUES ('Doris',18);"""
+            def xminResult = sql """SELECT xmin, xmax , * FROM 
${pgDB}.${pgSchema}.${tableName} WHERE name = 'Doris';"""
+            log.info("xminResult: " + xminResult)
         }
 
         Awaitility.await().atMost(300, SECONDS)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to