This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a0e0ee55cf6 [fix](streaming-job) fix postgres historical-date
timestamp handling in cdc-client (#63618)
a0e0ee55cf6 is described below
commit a0e0ee55cf6e7f02a73e32aa9b71f97cf9bbba9c
Author: wudi <[email protected]>
AuthorDate: Thu May 28 15:06:20 2026 +0800
[fix](streaming-job) fix postgres historical-date timestamp handling in
cdc-client (#63618)
### What problem does this PR solve?
Problem Summary:
When a Postgres CDC streaming job ingests rows whose timestamp / date
columns hold historical values (pre-1970 with sub-millisecond precision,
or pre-1582 / pre-1901 dates), two independent bugs in cdc-client cause
data corruption or task crash:
1. `DebeziumJsonDeserializer.convertTimestamp` uses signed `/` and `%`
on negative `micros` / `nanos`, producing a negative `nanoOfMillisecond`
and tripping Flink `TimestampData`'s `checkArgument(nanoOfMillisecond >=
0)`. Result: the ingestion task crashes whenever a pre-1970 timestamp
with sub-millisecond precision flows through (e.g. `1969-12-31
23:59:59.999123`).
2. The snapshot path reads column values via `rs.getObject()`, which
routes through PG JDBC's `TimestampUtils` + `GregorianCalendar`. For
pre-1582 timestamps the Julian/proleptic cutover shifts values by N
days; for pre-1901 timestamps the JVM time zone's LMT offset shifts
values by the LMT difference (e.g. ~343s in `Asia/Shanghai`). Result:
the same PG value (e.g. `0001-01-01 00:00:00`) yields different doris
values depending on whether the row was synced via snapshot or via
binlog.
This PR fixes both:
1. Use `Math.floorDiv` / `Math.floorMod` so the millisecond / nanosecond
split stays valid for negative epoch values.
2. Dispatch `TIMESTAMP` / `TIMESTAMPTZ` / `DATE` columns through
`LocalDateTime` / `OffsetDateTime` / `LocalDate` in the snapshot reader,
bypassing `GregorianCalendar` entirely. Preserve the legacy
`Timestamp(Long.MAX/MIN_VALUE)` sentinel for `+/-infinity`.
---
.../postgresql/connection/PostgresConnection.java | 905 +++++++++++++++++++++
.../deserialize/DebeziumJsonDeserializer.java | 46 +-
.../source/fetch/PostgresScanFetchTask.java | 391 +++++++++
.../deserialize/DebeziumJsonDeserializerTest.java | 76 ++
.../cdc/test_streaming_postgres_job_all_type.out | 4 +-
...ming_postgres_job_snapshot_historical_dates.out | 45 +
...g_postgres_job_snapshot_historical_dates.groovy | 229 ++++++
7 files changed, 1684 insertions(+), 12 deletions(-)
diff --git
a/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
new file mode 100644
index 00000000000..2f6ca5756dd
--- /dev/null
+++
b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
@@ -0,0 +1,905 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at
http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.postgresql.connection;
+
+import com.zaxxer.hikari.pool.HikariProxyConnection;
+import io.debezium.DebeziumException;
+import io.debezium.annotation.VisibleForTesting;
+import io.debezium.config.Configuration;
+import io.debezium.connector.postgresql.PgOid;
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.connector.postgresql.PostgresType;
+import io.debezium.connector.postgresql.PostgresValueConverter;
+import io.debezium.connector.postgresql.TypeRegistry;
+import io.debezium.connector.postgresql.spi.SlotState;
+import io.debezium.data.SpecialValueDecimal;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.ColumnEditor;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import io.debezium.schema.DatabaseSchema;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.postgresql.core.BaseConnection;
+import org.postgresql.jdbc.PgConnection;
+import org.postgresql.jdbc.TimestampUtils;
+import org.postgresql.replication.LogSequenceNumber;
+import org.postgresql.util.PGmoney;
+import org.postgresql.util.PSQLState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+/**
+ * Copied from Flink Cdc 3.6.0
+ *
+ * <p>Line 820~854: modified getColumnValue method to fix FLINK-39748.
+ */
+public class PostgresConnection extends JdbcConnection {
+
+ public static final String CONNECTION_STREAMING = "Debezium Streaming";
+ public static final String CONNECTION_SLOT_INFO = "Debezium Slot Info";
+ public static final String CONNECTION_DROP_SLOT = "Debezium Drop Slot";
+ public static final String CONNECTION_VALIDATE_CONNECTION = "Debezium
Validate Connection";
+ public static final String CONNECTION_HEARTBEAT = "Debezium Heartbeat";
+ public static final String CONNECTION_GENERAL = "Debezium General";
+
+ private static final Pattern FUNCTION_DEFAULT_PATTERN =
+ Pattern.compile("^[(]?[A-Za-z0-9_.]+\\((?:.+(?:, ?.+)*)?\\)");
+ private static final Pattern EXPRESSION_DEFAULT_PATTERN =
+ Pattern.compile("\\(+(?:.+(?:[+ - * / < > = ~ ! @ # % ^ & | ` ?]
?.+)+)+\\)");
+ private static Logger LOGGER =
LoggerFactory.getLogger(PostgresConnection.class);
+
+ private static final String URL_PATTERN =
+ "jdbc:postgresql://${"
+ + JdbcConfiguration.HOSTNAME
+ + "}:${"
+ + JdbcConfiguration.PORT
+ + "}/${"
+ + JdbcConfiguration.DATABASE
+ + "}";
+ protected static final ConnectionFactory FACTORY =
+ JdbcConnection.patternBasedFactory(
+ URL_PATTERN,
+ org.postgresql.Driver.class.getName(),
+ PostgresConnection.class.getClassLoader(),
+ JdbcConfiguration.PORT.withDefault(
+
PostgresConnectorConfig.PORT.defaultValueAsString()));
+
+ /**
+ * Obtaining a replication slot may fail if there's a pending transaction.
We're retrying to get
+ * a slot for 30 min.
+ */
+ private static final int MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT = 900;
+
+ private static final Duration
PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS =
+ Duration.ofSeconds(2);
+
+ private final TypeRegistry typeRegistry;
+ private final PostgresDefaultValueConverter defaultValueConverter;
+
+ /**
+ * Creates a Postgres connection using the supplied configuration. If
necessary this connection
+ * is able to resolve data type mappings. Such a connection requires a
{@link
+ * PostgresValueConverter}, and will provide its own {@link TypeRegistry}.
Usually only one such
+ * connection per connector is needed.
+ *
+ * @param config {@link Configuration} instance, may not be null.
+ * @param valueConverterBuilder supplies a configured {@link
PostgresValueConverter} for a given
+ * {@link TypeRegistry}
+ * @param connectionUsage a symbolic name of the connection to be tracked
in monitoring tools
+ */
+ public PostgresConnection(
+ JdbcConfiguration config,
+ PostgresValueConverterBuilder valueConverterBuilder,
+ String connectionUsage) {
+ this(config, valueConverterBuilder, connectionUsage, FACTORY);
+ }
+
+ /**
+ * Creates a Postgres connection using the supplied configuration. If
necessary this connection
+ * is able to resolve data type mappings. Such a connection requires a
{@link
+ * PostgresValueConverter}, and will provide its own {@link TypeRegistry}.
Usually only one such
+ * connection per connector is needed.
+ *
+ * @param config {@link Configuration} instance, may not be null.
+ * @param valueConverterBuilder supplies a configured {@link
PostgresValueConverter} for a given
+ * {@link TypeRegistry}
+ * @param connectionUsage a symbolic name of the connection to be tracked
in monitoring tools
+ */
+ public PostgresConnection(
+ JdbcConfiguration config,
+ PostgresValueConverterBuilder valueConverterBuilder,
+ String connectionUsage,
+ ConnectionFactory factory) {
+ super(
+ addDefaultSettings(config, connectionUsage),
+ factory,
+ PostgresConnection::validateServerVersion,
+ null,
+ "\"",
+ "\"");
+
+ if (Objects.isNull(valueConverterBuilder)) {
+ this.typeRegistry = null;
+ this.defaultValueConverter = null;
+ } else {
+ this.typeRegistry = new TypeRegistry(this);
+
+ final PostgresValueConverter valueConverter =
+ valueConverterBuilder.build(this.typeRegistry);
+ this.defaultValueConverter =
+ new PostgresDefaultValueConverter(valueConverter,
this.getTimestampUtils());
+ }
+ }
+
+ /**
+ * Create a Postgres connection using the supplied configuration and
{@link TypeRegistry}
+ *
+ * @param config {@link Configuration} instance, may not be null.
+ * @param typeRegistry an existing/already-primed {@link TypeRegistry}
instance
+ * @param connectionUsage a symbolic name of the connection to be tracked
in monitoring tools
+ */
+ public PostgresConnection(
+ PostgresConnectorConfig config, TypeRegistry typeRegistry, String
connectionUsage) {
+ super(
+ addDefaultSettings(config.getJdbcConfig(), connectionUsage),
+ FACTORY,
+ PostgresConnection::validateServerVersion,
+ null,
+ "\"",
+ "\"");
+ if (Objects.isNull(typeRegistry)) {
+ this.typeRegistry = null;
+ this.defaultValueConverter = null;
+ } else {
+ this.typeRegistry = typeRegistry;
+ final PostgresValueConverter valueConverter =
+ PostgresValueConverter.of(config,
this.getDatabaseCharset(), typeRegistry);
+ this.defaultValueConverter =
+ new PostgresDefaultValueConverter(valueConverter,
this.getTimestampUtils());
+ }
+ }
+
+ /**
+ * Creates a Postgres connection using the supplied configuration. The
connector is the regular
+ * one without datatype resolution capabilities.
+ *
+ * @param config {@link Configuration} instance, may not be null.
+ * @param connectionUsage a symbolic name of the connection to be tracked
in monitoring tools
+ */
+ public PostgresConnection(JdbcConfiguration config, String
connectionUsage) {
+ this(config, null, connectionUsage);
+ }
+
+ /** Return an unwrapped PgConnection instead of HikariProxyConnection */
+ @Override
+ public synchronized Connection connection() throws SQLException {
+ Connection conn = connection(true);
+ if (conn instanceof HikariProxyConnection) {
+ // assuming HikariCP use org.postgresql.jdbc.PgConnection
+ return conn.unwrap(PgConnection.class);
+ }
+ return conn;
+ }
+
+ static JdbcConfiguration addDefaultSettings(
+ JdbcConfiguration configuration, String connectionUsage) {
+ // we require Postgres 9.4 as the minimum server version since that's
where logical
+ // replication was first introduced
+ return JdbcConfiguration.adapt(
+ configuration
+ .edit()
+ .with("assumeMinServerVersion", "9.4")
+ .with("ApplicationName", connectionUsage)
+ .build());
+ }
+
+ /**
+ * Returns a JDBC connection string for the current configuration.
+ *
+ * @return a {@code String} where the variables in {@code urlPattern} are
replaced with values
+ * from the configuration
+ */
+ public String connectionString() {
+ return connectionString(URL_PATTERN);
+ }
+
+ /**
+ * Prints out information about the REPLICA IDENTITY status of a table.
This in turn determines
+ * how much information is available for UPDATE and DELETE operations for
logical replication.
+ *
+ * @param tableId the identifier of the table
+ * @return the replica identity information; never null
+ * @throws SQLException if there is a problem obtaining the replica
identity information for the
+ * given table
+ */
+ public ServerInfo.ReplicaIdentity readReplicaIdentityInfo(TableId tableId)
throws SQLException {
+ String statement =
+ "SELECT relreplident FROM pg_catalog.pg_class c "
+ + "LEFT JOIN pg_catalog.pg_namespace n ON
c.relnamespace=n.oid "
+ + "WHERE n.nspname=? and c.relname=?";
+ String schema =
+ tableId.schema() != null && tableId.schema().length() > 0
+ ? tableId.schema()
+ : "public";
+ StringBuilder replIdentity = new StringBuilder();
+ prepareQuery(
+ statement,
+ stmt -> {
+ stmt.setString(1, schema);
+ stmt.setString(2, tableId.table());
+ },
+ rs -> {
+ if (rs.next()) {
+ replIdentity.append(rs.getString(1));
+ } else {
+ LOGGER.warn(
+ "Cannot determine REPLICA IDENTITY information
for table '{}'",
+ tableId);
+ }
+ });
+ return ServerInfo.ReplicaIdentity.parseFromDB(replIdentity.toString());
+ }
+
+ /**
+ * Returns the current state of the replication slot
+ *
+ * @param slotName the name of the slot
+ * @param pluginName the name of the plugin used for the desired slot
+ * @return the {@link SlotState} or null, if no slot state is found
+ * @throws SQLException
+ */
+ public SlotState getReplicationSlotState(String slotName, String
pluginName)
+ throws SQLException {
+ ServerInfo.ReplicationSlot slot;
+ try {
+ slot = readReplicationSlotInfo(slotName, pluginName);
+ if (slot.equals(ServerInfo.ReplicationSlot.INVALID)) {
+ return null;
+ } else {
+ return slot.asSlotState();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ConnectException(
+ "Interrupted while waiting for valid replication slot
info", e);
+ }
+ }
+
+ /**
+ * Fetches the state of a replication stage given a slot name and plugin
name
+ *
+ * @param slotName the name of the slot
+ * @param pluginName the name of the plugin used for the desired slot
+ * @return the {@link ServerInfo.ReplicationSlot} object or a {@link
+ * ServerInfo.ReplicationSlot#INVALID} if the slot is not valid
+ * @throws SQLException is thrown by the underlying JDBC
+ */
+ private ServerInfo.ReplicationSlot fetchReplicationSlotInfo(String
slotName, String pluginName)
+ throws SQLException {
+ final String database = database();
+ final ServerInfo.ReplicationSlot slot =
+ queryForSlot(
+ slotName,
+ database,
+ pluginName,
+ rs -> {
+ if (rs.next()) {
+ boolean active = rs.getBoolean("active");
+ final Lsn confirmedFlushedLsn =
+ parseConfirmedFlushLsn(slotName,
pluginName, database, rs);
+ if (confirmedFlushedLsn == null) {
+ return null;
+ }
+ Lsn restartLsn =
+ parseRestartLsn(slotName, pluginName,
database, rs);
+ if (restartLsn == null) {
+ return null;
+ }
+ final Long xmin = rs.getLong("catalog_xmin");
+ return new ServerInfo.ReplicationSlot(
+ active, confirmedFlushedLsn,
restartLsn, xmin);
+ } else {
+ LOGGER.debug(
+ "No replication slot '{}' is present
for plugin '{}' and database '{}'",
+ slotName,
+ pluginName,
+ database);
+ return ServerInfo.ReplicationSlot.INVALID;
+ }
+ });
+ return slot;
+ }
+
+ /**
+ * Fetches a replication slot, repeating the query until either the slot
is created or until the
+ * max number of attempts has been reached
+ *
+ * <p>To fetch the slot without the retries, use the {@link
+ * PostgresConnection#fetchReplicationSlotInfo} call
+ *
+ * @param slotName the slot name
+ * @param pluginName the name of the plugin
+ * @return the {@link ServerInfo.ReplicationSlot} object or a {@link
+ * ServerInfo.ReplicationSlot#INVALID} if the slot is not valid
+ * @throws SQLException is thrown by the underyling jdbc driver
+ * @throws InterruptedException is thrown if we don't return an answer
within the set number of
+ * retries
+ */
+ @VisibleForTesting
+ ServerInfo.ReplicationSlot readReplicationSlotInfo(String slotName, String
pluginName)
+ throws SQLException, InterruptedException {
+ final String database = database();
+ final Metronome metronome =
+
Metronome.parker(PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS,
Clock.SYSTEM);
+
+ for (int attempt = 1; attempt <=
MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT; attempt++) {
+ final ServerInfo.ReplicationSlot slot =
fetchReplicationSlotInfo(slotName, pluginName);
+ if (slot != null) {
+ LOGGER.info("Obtained valid replication slot {}", slot);
+ return slot;
+ }
+ LOGGER.warn(
+ "Cannot obtain valid replication slot '{}' for plugin '{}'
and database '{}' [during attempt {} out of {}, concurrent tx probably blocks
taking snapshot.",
+ slotName,
+ pluginName,
+ database,
+ attempt,
+ MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT);
+ metronome.pause();
+ }
+
+ throw new ConnectException(
+ "Unable to obtain valid replication slot. "
+ + "Make sure there are no long-running transactions
running in parallel as they may hinder the allocation of the replication slot
when starting this connector");
+ }
+
+ protected ServerInfo.ReplicationSlot queryForSlot(
+ String slotName,
+ String database,
+ String pluginName,
+ ResultSetMapper<ServerInfo.ReplicationSlot> map)
+ throws SQLException {
+ return prepareQueryAndMap(
+ "select * from pg_replication_slots where slot_name = ? and
database = ? and plugin = ?",
+ statement -> {
+ statement.setString(1, slotName);
+ statement.setString(2, database);
+ statement.setString(3, pluginName);
+ },
+ map);
+ }
+
+ /**
+ * Obtains the LSN to resume streaming from. On PG 9.5 there is no
confirmed_flushed_lsn yet, so
+ * restart_lsn will be read instead. This may result in more records to be
re-read after a
+ * restart.
+ */
+ private Lsn parseConfirmedFlushLsn(
+ String slotName, String pluginName, String database, ResultSet rs)
{
+ Lsn confirmedFlushedLsn = null;
+
+ try {
+ confirmedFlushedLsn =
+ tryParseLsn(slotName, pluginName, database, rs,
"confirmed_flush_lsn");
+ } catch (SQLException e) {
+ LOGGER.info("unable to find confirmed_flushed_lsn, falling back to
restart_lsn");
+ try {
+ confirmedFlushedLsn =
+ tryParseLsn(slotName, pluginName, database, rs,
"restart_lsn");
+ } catch (SQLException e2) {
+ throw new ConnectException(
+ "Neither confirmed_flush_lsn nor restart_lsn could be
found");
+ }
+ }
+
+ return confirmedFlushedLsn;
+ }
+
+ private Lsn parseRestartLsn(String slotName, String pluginName, String
database, ResultSet rs) {
+ Lsn restartLsn = null;
+ try {
+ restartLsn = tryParseLsn(slotName, pluginName, database, rs,
"restart_lsn");
+ } catch (SQLException e) {
+ throw new ConnectException("restart_lsn could be found");
+ }
+
+ return restartLsn;
+ }
+
+ private Lsn tryParseLsn(
+ String slotName, String pluginName, String database, ResultSet rs,
String column)
+ throws ConnectException, SQLException {
+ Lsn lsn = null;
+
+ String lsnStr = rs.getString(column);
+ if (lsnStr == null) {
+ return null;
+ }
+ try {
+ lsn = Lsn.valueOf(lsnStr);
+ } catch (Exception e) {
+ throw new ConnectException(
+ "Value "
+ + column
+ + " in the pg_replication_slots table for slot = '"
+ + slotName
+ + "', plugin = '"
+ + pluginName
+ + "', database = '"
+ + database
+ + "' is not valid. This is an abnormal situation
and the database status should be checked.");
+ }
+ if (!lsn.isValid()) {
+ throw new ConnectException("Invalid LSN returned from database");
+ }
+ return lsn;
+ }
+
+ /**
+ * Drops a replication slot that was created on the DB
+ *
+ * @param slotName the name of the replication slot, may not be null
+ * @return {@code true} if the slot was dropped, {@code false} otherwise
+ */
+ public boolean dropReplicationSlot(String slotName) {
+ final int ATTEMPTS = 3;
+ for (int i = 0; i < ATTEMPTS; i++) {
+ try {
+ execute("select pg_drop_replication_slot('" + slotName + "')");
+ return true;
+ } catch (SQLException e) {
+ // slot is active
+ if
(PSQLState.OBJECT_IN_USE.getState().equals(e.getSQLState())) {
+ if (i < ATTEMPTS - 1) {
+ LOGGER.debug(
+ "Cannot drop replication slot '{}' because
it's still in use",
+ slotName);
+ } else {
+ LOGGER.warn(
+ "Cannot drop replication slot '{}' because
it's still in use",
+ slotName);
+ return false;
+ }
+ } else if
(PSQLState.UNDEFINED_OBJECT.getState().equals(e.getSQLState())) {
+ LOGGER.debug("Replication slot {} has already been
dropped", slotName);
+ return false;
+ } else {
+ LOGGER.error("Unexpected error while attempting to drop
replication slot", e);
+ return false;
+ }
+ }
+ try {
+ Metronome.parker(Duration.ofSeconds(1),
Clock.system()).pause();
+ } catch (InterruptedException e) {
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Drops the debezium publication that was created.
+ *
+ * @param publicationName the publication name, may not be null
+ * @return {@code true} if the publication was dropped, {@code false}
otherwise
+ */
+ public boolean dropPublication(String publicationName) {
+ try {
+ LOGGER.debug("Dropping publication '{}'", publicationName);
+ execute("DROP PUBLICATION " + publicationName);
+ return true;
+ } catch (SQLException e) {
+ if (PSQLState.UNDEFINED_OBJECT.getState().equals(e.getSQLState()))
{
+ LOGGER.debug("Publication {} has already been dropped",
publicationName);
+ } else {
+ LOGGER.error("Unexpected error while attempting to drop
publication", e);
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ try {
+ super.close();
+ } catch (SQLException e) {
+ LOGGER.error("Unexpected error while closing Postgres connection",
e);
+ }
+ }
+
+ /**
+ * Returns the PG id of the current active transaction
+ *
+ * @return a PG transaction identifier, or null if no tx is active
+ * @throws SQLException if anything fails.
+ */
+ public Long currentTransactionId() throws SQLException {
+ AtomicLong txId = new AtomicLong(0);
+ query(
+ "select (case pg_is_in_recovery() when 't' then 0 else
txid_current() end) AS pg_current_txid",
+ rs -> {
+ if (rs.next()) {
+ txId.compareAndSet(0, rs.getLong(1));
+ }
+ });
+ long value = txId.get();
+ return value > 0 ? value : null;
+ }
+
+ /**
+ * Returns the current position in the server tx log.
+ *
+ * @return a long value, never negative
+ * @throws SQLException if anything unexpected fails.
+ */
+ public long currentXLogLocation() throws SQLException {
+ AtomicLong result = new AtomicLong(0);
+ int majorVersion =
connection().getMetaData().getDatabaseMajorVersion();
+ query(
+ majorVersion >= 10
+ ? "select (case pg_is_in_recovery() when 't' then
pg_last_wal_receive_lsn() else pg_current_wal_lsn() end) AS pg_current_wal_lsn"
+ : "select * from pg_current_xlog_location()",
+ rs -> {
+ if (!rs.next()) {
+ throw new IllegalStateException(
+ "there should always be a valid xlog
position");
+ }
+ result.compareAndSet(0,
LogSequenceNumber.valueOf(rs.getString(1)).asLong());
+ });
+ return result.get();
+ }
+
+ /**
+ * Returns information about the PG server to which this instance is
connected.
+ *
+ * @return a {@link ServerInfo} instance, never {@code null}
+ * @throws SQLException if anything fails
+ */
+ public ServerInfo serverInfo() throws SQLException {
+ ServerInfo serverInfo = new ServerInfo();
+ query(
+ "SELECT version(), current_user, current_database()",
+ rs -> {
+ if (rs.next()) {
+ serverInfo
+ .withServer(rs.getString(1))
+ .withUsername(rs.getString(2))
+ .withDatabase(rs.getString(3));
+ }
+ });
+ String username = serverInfo.username();
+ if (username != null) {
+ query(
+ "SELECT oid, rolname, rolsuper, rolinherit, rolcreaterole,
rolcreatedb, rolcanlogin, rolreplication FROM pg_roles "
+ + "WHERE pg_has_role('"
+ + username
+ + "', oid, 'member')",
+ rs -> {
+ while (rs.next()) {
+ String roleInfo =
+ "superuser: "
+ + rs.getBoolean(3)
+ + ", replication: "
+ + rs.getBoolean(8)
+ + ", inherit: "
+ + rs.getBoolean(4)
+ + ", create role: "
+ + rs.getBoolean(5)
+ + ", create db: "
+ + rs.getBoolean(6)
+ + ", can log in: "
+ + rs.getBoolean(7);
+ String roleName = rs.getString(2);
+ serverInfo.addRole(roleName, roleInfo);
+ }
+ });
+ }
+ return serverInfo;
+ }
+
+ public Charset getDatabaseCharset() {
+ try {
+ return Charset.forName(((BaseConnection)
connection()).getEncoding().name());
+ } catch (SQLException e) {
+ throw new DebeziumException("Couldn't obtain encoding for database
" + database(), e);
+ }
+ }
+
+ public TimestampUtils getTimestampUtils() {
+ try {
+ return ((PgConnection) this.connection()).getTimestampUtils();
+ } catch (SQLException e) {
+ throw new DebeziumException(
+ "Couldn't get timestamp utils from underlying connection",
e);
+ }
+ }
+
+ private static void validateServerVersion(Statement statement) throws
SQLException {
+ DatabaseMetaData metaData = statement.getConnection().getMetaData();
+ int majorVersion = metaData.getDatabaseMajorVersion();
+ int minorVersion = metaData.getDatabaseMinorVersion();
+ if (majorVersion < 9 || (majorVersion == 9 && minorVersion < 4)) {
+ throw new SQLException("Cannot connect to a version of Postgres
lower than 9.4");
+ }
+ }
+
+ @Override
+ public String quotedColumnIdString(String columnName) {
+ if (columnName.contains("\"")) {
+ columnName = columnName.replaceAll("\"", "\"\"");
+ }
+
+ return super.quotedColumnIdString(columnName);
+ }
+
+ @Override
+ protected int resolveNativeType(String typeName) {
+ return getTypeRegistry().get(typeName).getRootType().getOid();
+ }
+
+ @Override
+ protected int resolveJdbcType(int metadataJdbcType, int nativeType) {
+ // Special care needs to be taken for columns that use user-defined
domain type data types
+ // where resolution of the column's JDBC type needs to be that of the
root type instead of
+ // the actual column to properly influence schema building and value
conversion.
+ return getTypeRegistry().get(nativeType).getRootType().getJdbcId();
+ }
+
+ @Override
+ protected Optional<ColumnEditor> readTableColumn(
+ ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter
columnFilter)
+ throws SQLException {
+ return doReadTableColumn(columnMetadata, tableId, columnFilter);
+ }
+
+ public Optional<Column> readColumnForDecoder(
+ ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter
columnNameFilter)
+ throws SQLException {
+ return doReadTableColumn(columnMetadata, tableId, columnNameFilter)
+ .map(ColumnEditor::create);
+ }
+
+ private Optional<ColumnEditor> doReadTableColumn(
+ ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter
columnFilter)
+ throws SQLException {
+ // FLINK-38965: Filter out columns from other tables that might be
returned due to
+ // PostgreSQL LIKE wildcard matching. The underscore '_' matches any
single character,
+ // and '%' matches any sequence of characters. For example:
+ // - When querying 'user_sink', the pattern may also match 'userbsink'
(due to '_')
+ // - When querying 'user%data' (where % is literal), it may match
'user_test_data' (due to
+ // '%')
+ final String resultTableName = columnMetadata.getString(3);
+ if (!tableId.table().equals(resultTableName)) {
+ return Optional.empty();
+ }
+
+ final String columnName = columnMetadata.getString(4);
+ if (columnFilter == null
+ || columnFilter.matches(
+ tableId.catalog(), tableId.schema(), tableId.table(),
columnName)) {
+ final ColumnEditor column = Column.editor().name(columnName);
+ column.type(columnMetadata.getString(6));
+
+ // first source the length/scale from the column metadata provided
by the driver
+ // this may be overridden below if the column type is a
user-defined domain type
+ column.length(columnMetadata.getInt(7));
+ if (columnMetadata.getObject(9) != null) {
+ column.scale(columnMetadata.getInt(9));
+ }
+
+ column.optional(isNullable(columnMetadata.getInt(11)));
+ column.position(columnMetadata.getInt(17));
+
column.autoIncremented("YES".equalsIgnoreCase(columnMetadata.getString(23)));
+
+ String autogenerated = null;
+ try {
+ autogenerated = columnMetadata.getString(24);
+ } catch (SQLException e) {
+ // ignore, some drivers don't have this index - e.g. Postgres
+ }
+ column.generated("YES".equalsIgnoreCase(autogenerated));
+
+ // Lookup the column type from the TypeRegistry
+ // For all types, we need to set the Native and Jdbc types by
using the root-type
+ final PostgresType nativeType =
getTypeRegistry().get(column.typeName());
+ column.nativeType(nativeType.getRootType().getOid());
+ column.jdbcType(nativeType.getRootType().getJdbcId());
+
+ // For domain types, the postgres driver is unable to traverse a
nested unbounded
+ // hierarchy of types and report the right length/scale of a given
type. We use
+ // the TypeRegistry to accomplish this since it is capable of
traversing the type
+ // hierarchy upward to resolve length/scale regardless of
hierarchy depth.
+ if (TypeRegistry.DOMAIN_TYPE == nativeType.getJdbcId()) {
+ column.length(nativeType.getDefaultLength());
+ column.scale(nativeType.getDefaultScale());
+ }
+
+ final String defaultValueExpression = columnMetadata.getString(13);
+ if (defaultValueExpression != null
+ &&
getDefaultValueConverter().supportConversion(column.typeName())) {
+ column.defaultValueExpression(defaultValueExpression);
+ }
+
+ return Optional.of(column);
+ }
+
+ return Optional.empty();
+ }
+
+ public PostgresDefaultValueConverter getDefaultValueConverter() {
+ Objects.requireNonNull(
+ defaultValueConverter, "Connection does not provide default
value converter");
+ return defaultValueConverter;
+ }
+
+ public TypeRegistry getTypeRegistry() {
+ Objects.requireNonNull(typeRegistry, "Connection does not provide type
registry");
+ return typeRegistry;
+ }
+
+ @Override
+ public <T extends DatabaseSchema<TableId>> Object getColumnValue(
+ ResultSet rs, int columnIndex, Column column, Table table, T
schema)
+ throws SQLException {
+ try {
+ final ResultSetMetaData metaData = rs.getMetaData();
+ final String columnTypeName =
metaData.getColumnTypeName(columnIndex);
+ final PostgresType type =
+ ((PostgresSchema)
schema).getTypeRegistry().get(columnTypeName);
+
+ LOGGER.trace("Type of incoming data is: {}", type.getOid());
+ LOGGER.trace("ColumnTypeName is: {}", columnTypeName);
+ LOGGER.trace("Type is: {}", type);
+
+ if (type.isArrayType()) {
+ return rs.getArray(columnIndex);
+ }
+
+ switch (type.getOid()) {
+ case PgOid.MONEY:
+ // TODO author=Horia Chiorean date=14/11/2016
description=workaround for
+ // https://github.com/pgjdbc/pgjdbc/issues/100
+ final String sMoney = rs.getString(columnIndex);
+ if (sMoney == null) {
+ return sMoney;
+ }
+ if (sMoney.startsWith("-")) {
+ // PGmoney expects negative values to be provided in
the format of
+ // "($XXXXX.YY)"
+ final String negativeMoney = "(" + sMoney.substring(1)
+ ")";
+ return new PGmoney(negativeMoney).val;
+ }
+ return new PGmoney(sMoney).val;
+ case PgOid.BIT:
+ return rs.getString(columnIndex);
+ case PgOid.NUMERIC:
+ final String s = rs.getString(columnIndex);
+ if (s == null) {
+ return s;
+ }
+
+ Optional<SpecialValueDecimal> value =
PostgresValueConverter.toSpecialValue(s);
+ return value.isPresent()
+ ? value.get()
+ : new
SpecialValueDecimal(rs.getBigDecimal(columnIndex));
+ case PgOid.TIME:
+ // To handle time 24:00:00 supported by TIME columns, read the
column as a
+ // string.
+ case PgOid.TIMETZ:
+ // In order to guarantee that we resolve TIMETZ columns
with proper microsecond
+ // precision,
+ // read the column as a string instead and then re-parse
inside the converter.
+ return rs.getString(columnIndex);
+ case PgOid.TIMESTAMP:
+ {
+ // LocalDateTime bypasses GregorianCalendar's
Julian/Gregorian cutover
+ // (1582-10-15), which shifts pre-cutover values by N
days
+ // (e.g. 0001-01-01 by 2 days).
+ LocalDateTime ldt = rs.getObject(columnIndex,
LocalDateTime.class);
+ if (ldt == null) {
+ return null;
+ }
+ // PG +/-infinity surfaces as
Timestamp(Long.MAX/MIN_VALUE) via the legacy
+ // rs.getObject() path; preserve that contract for
downstream converters.
+ if (ldt == LocalDateTime.MAX) {
+ return new Timestamp(Long.MAX_VALUE);
+ }
+ if (ldt == LocalDateTime.MIN) {
+ return new Timestamp(Long.MIN_VALUE);
+ }
+ return ldt;
+ }
+ case PgOid.TIMESTAMPTZ:
+ {
+ OffsetDateTime odt = rs.getObject(columnIndex,
OffsetDateTime.class);
+ if (odt == null) {
+ return null;
+ }
+ if (odt == OffsetDateTime.MAX) {
+ return new Timestamp(Long.MAX_VALUE);
+ }
+ if (odt == OffsetDateTime.MIN) {
+ return new Timestamp(Long.MIN_VALUE);
+ }
+ return odt;
+ }
+ case PgOid.DATE:
+ return rs.getObject(columnIndex, LocalDate.class);
+ default:
+ Object x = rs.getObject(columnIndex);
+ if (x != null) {
+ LOGGER.trace(
+ "rs getobject returns class: {}; rs getObject
value is: {}",
+ x.getClass(),
+ x);
+ }
+ return x;
+ }
+ } catch (SQLException e) {
+ // not a known type
+ return super.getColumnValue(rs, columnIndex, column, table,
schema);
+ }
+ }
+
+ @Override
+ protected String[] supportedTableTypes() {
+ return new String[] {"VIEW", "MATERIALIZED VIEW", "TABLE",
"PARTITIONED TABLE"};
+ }
+
+ @Override
+ protected boolean isTableType(String tableType) {
+ return "TABLE".equals(tableType) || "PARTITIONED
TABLE".equals(tableType);
+ }
+
+ @Override
+ protected boolean isTableUniqueIndexIncluded(String indexName, String
columnName) {
+ if (columnName != null) {
+ return !FUNCTION_DEFAULT_PATTERN.matcher(columnName).matches()
+ &&
!EXPRESSION_DEFAULT_PATTERN.matcher(columnName).matches();
+ }
+ return false;
+ }
+
+ /**
+ * Retrieves all {@code TableId}s in a given database catalog, including
partitioned tables.
+ *
+ * @param catalogName the catalog/database name
+ * @return set of all table ids for existing table objects
+ * @throws SQLException if a database exception occurred
+ */
+ public Set<TableId> getAllTableIds(String catalogName) throws SQLException
{
+ return readTableNames(catalogName, null, null, new String[] {"TABLE",
"PARTITIONED TABLE"});
+ }
+
+ @FunctionalInterface
+ public interface PostgresValueConverterBuilder {
+ PostgresValueConverter build(TypeRegistry registry);
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
index 7876597660e..6881dd70d40 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
@@ -21,6 +21,7 @@ import org.apache.doris.cdcclient.utils.ConfigUtil;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
import org.apache.flink.cdc.debezium.utils.TemporalConversions;
import org.apache.flink.table.data.TimestampData;
@@ -161,6 +162,7 @@ public class DebeziumJsonDeserializer
if (!excludeColumns.contains(field.name())) {
Object valueConverted =
convert(
+ field.name(),
field.schema(),
after.getWithoutDefault(field.name()));
record.put(field.name(), valueConverted);
@@ -185,6 +187,7 @@ public class DebeziumJsonDeserializer
if (!excludeColumns.contains(field.name())) {
Object valueConverted =
convert(
+ field.name(),
field.schema(),
before.getWithoutDefault(field.name()));
record.put(field.name(), valueConverted);
@@ -194,7 +197,20 @@ public class DebeziumJsonDeserializer
return objectMapper.writeValueAsString(record);
}
- private Object convert(Schema fieldSchema, Object dbzObj) {
+ private Object convert(String fieldName, Schema fieldSchema, Object
dbzObj) {
+ try {
+ return convertInternal(fieldSchema, dbzObj);
+ } catch (Exception e) {
+ String msg =
+ String.format(
+ "Failed to convert column '%s' value=%s: %s",
+ fieldName, dbzObj, ExceptionUtils.getMessage(e));
+ LOG.error(msg, e);
+ throw new RuntimeException(msg);
+ }
+ }
+
+ private Object convertInternal(Schema fieldSchema, Object dbzObj) {
if (dbzObj == null) {
return null;
}
@@ -307,15 +323,25 @@ public class DebeziumJsonDeserializer
case Timestamp.SCHEMA_NAME:
return TimestampData.fromEpochMillis((Long)
dbzObj).toTimestamp().toString();
case MicroTimestamp.SCHEMA_NAME:
- long micro = (long) dbzObj;
- return TimestampData.fromEpochMillis(micro / 1000, (int)
(micro % 1000 * 1000))
- .toTimestamp()
- .toString();
+ {
+ // floorDiv/floorMod keep nanoOfMillisecond
non-negative for pre-1970
+ // values.
+ long micro = (long) dbzObj;
+ long millis = Math.floorDiv(micro, 1000L);
+ int nanos = (int) Math.floorMod(micro, 1000L) * 1000;
+ return TimestampData.fromEpochMillis(millis, nanos)
+ .toTimestamp()
+ .toString();
+ }
case NanoTimestamp.SCHEMA_NAME:
- long nano = (long) dbzObj;
- return TimestampData.fromEpochMillis(nano / 1000_000,
(int) (nano % 1000_000))
- .toTimestamp()
- .toString();
+ {
+ long nano = (long) dbzObj;
+ long millis = Math.floorDiv(nano, 1_000_000L);
+ int nanos = (int) Math.floorMod(nano, 1_000_000L);
+ return TimestampData.fromEpochMillis(millis, nanos)
+ .toTimestamp()
+ .toString();
+ }
}
}
LocalDateTime localDateTime =
TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
@@ -364,7 +390,7 @@ public class DebeziumJsonDeserializer
Schema elementSchema = fieldSchema.valueSchema();
List<Object> result = new ArrayList<>();
for (Object element : (List<?>) dbzObj) {
- result.add(element == null ? null : convert(elementSchema,
element));
+ result.add(element == null ? null :
convertInternal(elementSchema, element));
}
return result;
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
b/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
new file mode 100644
index 00000000000..6c26cf4e74a
--- /dev/null
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.fetch;
+
+import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import
org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
+import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
+import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+import
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetUtils;
+import
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresEventDispatcher;
+import io.debezium.connector.postgresql.PostgresOffsetContext;
+import io.debezium.connector.postgresql.PostgresPartition;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
+import io.debezium.connector.postgresql.connection.ReplicationConnection;
+import io.debezium.connector.postgresql.spi.SlotState;
+import io.debezium.pipeline.EventDispatcher;
+import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
+import io.debezium.pipeline.source.spi.SnapshotProgressListener;
+import io.debezium.pipeline.spi.SnapshotResult;
+import io.debezium.relational.Column;
+import io.debezium.relational.RelationalSnapshotChangeEventSource;
+import io.debezium.relational.SnapshotChangeRecordEmitter;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.util.Clock;
+import io.debezium.util.ColumnUtils;
+import io.debezium.util.Strings;
+import io.debezium.util.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static
io.debezium.connector.postgresql.PostgresObjectUtils.waitForReplicationSlotReady;
+import static io.debezium.connector.postgresql.Utils.refreshSchema;
+
+/**
+ * Copied from Flink Cdc 3.6.0
+ *
+ * <p>Line 333~336: modified createDataEventsForTable to fix FLINK-39748.
+ */
+public class PostgresScanFetchTask extends AbstractScanFetchTask {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresScanFetchTask.class);
+
+ public PostgresScanFetchTask(SnapshotSplit split) {
+ super(split);
+ }
+
+ @Override
+ public void execute(Context context) throws Exception {
+
+ PostgresSourceFetchTaskContext ctx = (PostgresSourceFetchTaskContext)
context;
+ PostgresSourceConfig sourceConfig = (PostgresSourceConfig)
context.getSourceConfig();
+ try {
+ // create slot here, because a slot can only read wal-log after
its own creation.
+ // if skip backfill, no need to create slot here
+ maybeCreateSlotForBackFillReadTask(
+ ctx.getConnection(),
+ ctx.getReplicationConnection(),
+ sourceConfig.getSlotNameForBackfillTask(),
+ ctx.getPluginName(),
+ sourceConfig.isSkipSnapshotBackfill());
+ super.execute(context);
+ } finally {
+ // remove slot after snapshot slit finish
+ maybeDropSlotForBackFillReadTask(
+ (PostgresReplicationConnection)
ctx.getReplicationConnection(),
+ sourceConfig.isSkipSnapshotBackfill());
+ }
+ }
+
+ @Override
+ protected void executeDataSnapshot(Context context) throws Exception {
+ PostgresSourceFetchTaskContext ctx = (PostgresSourceFetchTaskContext)
context;
+
+ PostgresSnapshotSplitReadTask snapshotSplitReadTask =
+ new PostgresSnapshotSplitReadTask(
+ ctx.getConnection(),
+ ctx.getDbzConnectorConfig(),
+ ctx.getDatabaseSchema(),
+ ctx.getOffsetContext(),
+ ctx.getEventDispatcher(),
+ ctx.getSnapshotChangeEventSourceMetrics(),
+ snapshotSplit);
+
+ StoppableChangeEventSourceContext changeEventSourceContext =
+ new StoppableChangeEventSourceContext();
+ SnapshotResult<PostgresOffsetContext> snapshotResult =
+ snapshotSplitReadTask.execute(
+ changeEventSourceContext, ctx.getPartition(),
ctx.getOffsetContext());
+
+ if (!snapshotResult.isCompletedOrSkipped()) {
+ taskRunning = false;
+ throw new IllegalStateException(
+ String.format("Read snapshot for postgres split %s fail",
snapshotResult));
+ }
+ }
+
+ @Override
+ protected void executeBackfillTask(Context context, StreamSplit
backfillStreamSplit)
+ throws Exception {
+ PostgresSourceFetchTaskContext ctx = (PostgresSourceFetchTaskContext)
context;
+
+ final PostgresOffsetContext.Loader loader =
+ new PostgresOffsetContext.Loader(ctx.getDbzConnectorConfig());
+ final PostgresOffsetContext postgresOffsetContext =
+ PostgresOffsetUtils.getPostgresOffsetContext(
+ loader, backfillStreamSplit.getStartingOffset());
+
+ final PostgresStreamFetchTask.StreamSplitReadTask backfillReadTask =
+ new PostgresStreamFetchTask.StreamSplitReadTask(
+ ctx.getDbzConnectorConfig(),
+ ctx.getSnapShotter(),
+ ctx.getConnection(),
+ ctx.getEventDispatcher(),
+ ctx.getWaterMarkDispatcher(),
+ ctx.getErrorHandler(),
+ ctx.getTaskContext().getClock(),
+ ctx.getDatabaseSchema(),
+ ctx.getTaskContext(),
+ ctx.getReplicationConnection(),
+ backfillStreamSplit);
+ LOG.info(
+ "Execute backfillReadTask for split {} with slot name {}",
+ snapshotSplit,
+ ((PostgresSourceConfig)
ctx.getSourceConfig()).getSlotNameForBackfillTask());
+ backfillReadTask.execute(
+ new StoppableChangeEventSourceContext(), ctx.getPartition(),
postgresOffsetContext);
+ }
+
+ /**
+ * Create a slot before snapshot reading so that the slot can track the
WAL log during the
+ * snapshot reading phase.
+ */
+ private void maybeCreateSlotForBackFillReadTask(
+ PostgresConnection jdbcConnection,
+ ReplicationConnection replicationConnection,
+ String slotName,
+ String pluginName,
+ boolean skipSnapshotBackfill) {
+ // if skip backfill, no need to create slot here
+ if (skipSnapshotBackfill) {
+ return;
+ }
+
+ try {
+ SlotState slotInfo = null;
+ try {
+ slotInfo = jdbcConnection.getReplicationSlotState(slotName,
pluginName);
+ } catch (SQLException e) {
+ LOG.info("Unable to load info of replication slot, will try to
create the slot");
+ }
+ if (slotInfo == null) {
+ try {
+ replicationConnection.createReplicationSlot().orElse(null);
+ } catch (SQLException ex) {
+ String message = "Creation of replication slot failed";
+ if (ex.getMessage().contains("already exists")) {
+ message +=
+ "; when setting up multiple connectors for the
same database host, please make sure to use a distinct replication slot name
for each.";
+ }
+ throw new FlinkRuntimeException(message, ex);
+ }
+ }
+ waitForReplicationSlotReady(30, jdbcConnection, slotName,
pluginName);
+ } catch (Throwable t) {
+ throw new FlinkRuntimeException(t);
+ }
+ }
+
+ /** Drop slot for backfill task and close replication connection. */
+ private void maybeDropSlotForBackFillReadTask(
+ PostgresReplicationConnection replicationConnection, boolean
skipSnapshotBackfill) {
+ // if skip backfill, no need to create slot here
+ if (skipSnapshotBackfill) {
+ return;
+ }
+
+ try {
+ replicationConnection.close(true);
+ } catch (Throwable t) {
+ LOG.error("Unexpected error while dropping replication slot", t);
+ throw new FlinkRuntimeException(t);
+ }
+ }
+
+ /** A SnapshotChangeEventSource implementation for Postgres to read
snapshot split. */
+ public static class PostgresSnapshotSplitReadTask
+ extends AbstractSnapshotChangeEventSource<PostgresPartition,
PostgresOffsetContext> {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PostgresSnapshotSplitReadTask.class);
+
+ private final PostgresConnection jdbcConnection;
+ private final PostgresConnectorConfig connectorConfig;
+ private final PostgresEventDispatcher<TableId> eventDispatcher;
+ private final SnapshotSplit snapshotSplit;
+ private final PostgresOffsetContext offsetContext;
+ private final PostgresSchema databaseSchema;
+ private final SnapshotProgressListener<PostgresPartition>
snapshotProgressListener;
+ private final Clock clock;
+
+ public PostgresSnapshotSplitReadTask(
+ PostgresConnection jdbcConnection,
+ PostgresConnectorConfig connectorConfig,
+ PostgresSchema databaseSchema,
+ PostgresOffsetContext previousOffset,
+ PostgresEventDispatcher<TableId> eventDispatcher,
+ SnapshotProgressListener snapshotProgressListener,
+ SnapshotSplit snapshotSplit) {
+ super(connectorConfig, snapshotProgressListener);
+ this.jdbcConnection = jdbcConnection;
+ this.connectorConfig = connectorConfig;
+ this.snapshotProgressListener = snapshotProgressListener;
+ this.databaseSchema = databaseSchema;
+ this.eventDispatcher = eventDispatcher;
+ this.snapshotSplit = snapshotSplit;
+ this.offsetContext = previousOffset;
+ this.clock = Clock.SYSTEM;
+ }
+
+ @Override
+ protected SnapshotResult<PostgresOffsetContext> doExecute(
+ ChangeEventSourceContext context,
+ PostgresOffsetContext previousOffset,
+ SnapshotContext<PostgresPartition, PostgresOffsetContext>
snapshotContext,
+ SnapshottingTask snapshottingTask)
+ throws Exception {
+ final PostgresSnapshotContext ctx = (PostgresSnapshotContext)
snapshotContext;
+ ctx.offset = offsetContext;
+
+ refreshSchema(databaseSchema, jdbcConnection, true);
+ createDataEvents(ctx, snapshotSplit.getTableId());
+
+ return SnapshotResult.completed(ctx.offset);
+ }
+
+ private void createDataEvents(PostgresSnapshotContext snapshotContext,
TableId tableId)
+ throws InterruptedException {
+ EventDispatcher.SnapshotReceiver<PostgresPartition>
snapshotReceiver =
+ eventDispatcher.getSnapshotChangeEventReceiver();
+ LOG.info("Snapshotting table {}", tableId);
+ createDataEventsForTable(
+ snapshotContext,
+ snapshotReceiver,
+ Objects.requireNonNull(databaseSchema.tableFor(tableId)));
+ snapshotReceiver.completeSnapshot();
+ }
+
+ /** Dispatches the data change events for the records of a single
table. */
+ private void createDataEventsForTable(
+ PostgresSnapshotContext snapshotContext,
+ EventDispatcher.SnapshotReceiver<PostgresPartition>
snapshotReceiver,
+ Table table)
+ throws InterruptedException {
+
+ long exportStart = clock.currentTimeInMillis();
+ LOG.info(
+ "Exporting data from split '{}' of table {}",
+ snapshotSplit.splitId(),
+ table.id());
+
+ List<String> uuidFields =
+ snapshotSplit.getSplitKeyType().getFieldNames().stream()
+ .filter(field ->
table.columnWithName(field).typeName().equals("uuid"))
+ .collect(Collectors.toList());
+
+ List<String> columnNames =
+ table.columns().stream()
+ .map(column ->
jdbcConnection.quotedColumnIdString(column.name()))
+ .collect(Collectors.toList());
+ final String selectSql =
+ PostgresQueryUtils.buildSplitScanQuery(
+ snapshotSplit.getTableId(),
+ snapshotSplit.getSplitKeyType(),
+ snapshotSplit.getSplitStart() == null,
+ snapshotSplit.getSplitEnd() == null,
+ columnNames,
+ uuidFields);
+ LOG.debug(
+ "For split '{}' of table {} using select statement: '{}'",
+ snapshotSplit.splitId(),
+ table.id(),
+ selectSql);
+
+ try (PreparedStatement selectStatement =
+ PostgresQueryUtils.readTableSplitDataStatement(
+ jdbcConnection,
+ selectSql,
+ snapshotSplit.getSplitStart() == null,
+ snapshotSplit.getSplitEnd() == null,
+ snapshotSplit.getSplitStart(),
+ snapshotSplit.getSplitEnd(),
+
snapshotSplit.getSplitKeyType().getFieldCount(),
+ connectorConfig.getSnapshotFetchSize());
+ ResultSet rs = selectStatement.executeQuery()) {
+
+ ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs,
table);
+ long rows = 0;
+ Threads.Timer logTimer = getTableScanLogTimer();
+
+ while (rs.next()) {
+ rows++;
+ final Object[] row = new
Object[columnArray.getGreatestColumnPosition()];
+ for (int i = 0; i < columnArray.getColumns().length; i++) {
+ Column col = columnArray.getColumns()[i];
+ row[col.position() - 1] =
+ jdbcConnection.getColumnValue(
+ rs, i + 1, col, table, databaseSchema);
+ }
+ if (logTimer.expired()) {
+ long stop = clock.currentTimeInMillis();
+ LOG.info(
+ "Exported {} records for split '{}' after {}",
+ rows,
+ snapshotSplit.splitId(),
+ Strings.duration(stop - exportStart));
+ snapshotProgressListener.rowsScanned(
+ snapshotContext.partition, table.id(), rows);
+ logTimer = getTableScanLogTimer();
+ }
+ snapshotContext.offset.event(table.id(),
clock.currentTime());
+ SnapshotChangeRecordEmitter<PostgresPartition> emitter =
+ new SnapshotChangeRecordEmitter<>(
+ snapshotContext.partition,
snapshotContext.offset, row, clock);
+ eventDispatcher.dispatchSnapshotEvent(
+ snapshotContext.partition, table.id(), emitter,
snapshotReceiver);
+ }
+ LOG.info(
+ "Finished exporting {} records for split '{}', total
duration '{}'",
+ rows,
+ snapshotSplit.splitId(),
+ Strings.duration(clock.currentTimeInMillis() -
exportStart));
+ } catch (SQLException e) {
+ throw new FlinkRuntimeException(
+ "Snapshotting of table " + table.id() + " failed", e);
+ }
+ }
+
+ private Threads.Timer getTableScanLogTimer() {
+ return Threads.timer(clock, LOG_INTERVAL);
+ }
+
+ @Override
+ protected SnapshottingTask getSnapshottingTask(
+ PostgresPartition partition, PostgresOffsetContext
previousOffset) {
+ return new SnapshottingTask(false, true);
+ }
+
+ @Override
+ protected PostgresSnapshotContext prepare(PostgresPartition partition)
throws Exception {
+ return new PostgresSnapshotContext(partition);
+ }
+
+ private static class PostgresSnapshotContext
+ extends
RelationalSnapshotChangeEventSource.RelationalSnapshotContext<
+ PostgresPartition, PostgresOffsetContext> {
+
+ public PostgresSnapshotContext(PostgresPartition partition) throws
SQLException {
+ super(partition, "");
+ }
+ }
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
new file mode 100644
index 00000000000..ac4d7b58226
--- /dev/null
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.deserialize;
+
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Method;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import io.debezium.time.MicroTimestamp;
+import io.debezium.time.NanoTimestamp;
+
+/** Unit tests for {@link DebeziumJsonDeserializer}. */
+class DebeziumJsonDeserializerTest {
+
+ private final DebeziumJsonDeserializer deserializer = new
DebeziumJsonDeserializer();
+
+ // ─── convertTimestamp
─────────────────────────────────────────────────────
+
+ @Test
+ void microTimestamp_negativeSubMillisecond_doesNotThrow() {
+ // micros = -877 ⇒ 1969-12-31 23:59:59.999123. Signed `/` `%` produced
a
+ // negative nanoOfMillisecond and tripped TimestampData's >= 0 check.
+ Object out = invokeConvertTimestamp(MicroTimestamp.SCHEMA_NAME, -877L);
+ assertEquals("1969-12-31 23:59:59.999123", out.toString());
+ }
+
+ @Test
+ void microTimestamp_positive_unchanged() {
+ Object out = invokeConvertTimestamp(MicroTimestamp.SCHEMA_NAME,
1_234_567L);
+ assertEquals("1970-01-01 00:00:01.234567", out.toString());
+ }
+
+ @Test
+ void microTimestamp_negativeIntegerMillis_unchanged() {
+ // micros = -1000 ⇒ 1969-12-31 23:59:59.999, negative but no
sub-millisecond
+ // (the old code happened to produce the right result here; protect
that path).
+ Object out = invokeConvertTimestamp(MicroTimestamp.SCHEMA_NAME,
-1000L);
+ assertEquals("1969-12-31 23:59:59.999", out.toString());
+ }
+
+ @Test
+ void nanoTimestamp_negativeSubMillisecond_doesNotThrow() {
+ // nanos = -877_000 ⇒ 1969-12-31 23:59:59.999123.
+ Object out = invokeConvertTimestamp(NanoTimestamp.SCHEMA_NAME,
-877_000L);
+ assertEquals("1969-12-31 23:59:59.999123", out.toString());
+ }
+
+ private Object invokeConvertTimestamp(String typeName, Object dbzObj) {
+ try {
+ Method m =
+ DebeziumJsonDeserializer.class.getDeclaredMethod(
+ "convertTimestamp", String.class, Object.class);
+ m.setAccessible(true);
+ return m.invoke(deserializer, typeName, dbzObj);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out
index abdc4038774..feeb6757d2d 100644
---
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out
@@ -34,9 +34,9 @@ xml_col text Yes false \N NONE
hstore_col text Yes false \N NONE
-- !select_all_types_null --
-1 1 100 1000 1.23 4.56 12345.678901 char
varchar text value true 2024-01-01 12:00 12:00:00Z
2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w==
11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1
192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a",
"b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}
08:00:2b:01:02:03:04:05 <root><item>1</item></root> {"a":"1","b":"2"}
+1 1 100 1000 1.23 4.56 12345.678901 char
varchar text value true 2024-01-01 12:00 04:00:00Z
2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w==
11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1
192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a",
"b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}
08:00:2b:01:02:03:04:05 <root><item>1</item></root> {"a":"1","b":"2"}
-- !select_all_types_null2 --
-1 1 100 1000 1.23 4.56 12345.678901 char
varchar text value true 2024-01-01 12:00 12:00:00Z
2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w==
11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1
192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a",
"b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}
08:00:2b:01:02:03:04:05 <root><item>1</item></root> {"a":"1","b":"2"}
+1 1 100 1000 1.23 4.56 12345.678901 char
varchar text value true 2024-01-01 12:00 04:00:00Z
2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w==
11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1
192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a",
"b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}
08:00:2b:01:02:03:04:05 <root><item>1</item></root> {"a":"1","b":"2"}
2 2 200 2000 7.89 0.12 99999.000001 char2
varchar2 another text false 2025-01-01 23:59:59
23:59:59Z 2025-01-01T23:59:59 2025-01-01T23:59:59 P0Y0M0DT2H0M0S
3q2+7w== 11111111-2222-3333-4444-555555555556 {"x":10} {"y":
20} 10.0.0.1 10.0.0.0/16 08:00:2b:aa:bb:cc 8A== Dw==
[10, 20] ["x", "y"] {"coordinates":[3,4],"type":"Point","srid":0}
08:00:2b:aa:bb:cc:dd:ee <root><item>2</item></root> {"x":"10","y":"20"}
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.out
new file mode 100644
index 00000000000..0df4f0ca737
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.out
@@ -0,0 +1,45 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !snapshot --
+1 0001-01-01T00:00:00.000123 0001-01-01T00:00:00.000123
0001-01-01
+2 0500-06-15T10:00 0500-06-15T15:00 0500-06-15
+3 1582-10-04T12:34:56 1582-10-04T12:34:56 1582-10-04
+4 1582-10-15T00:00 1582-10-15T00:00 1582-10-15
+5 1800-07-20T03:30 1800-07-19T22:00 1800-07-20
+6 1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31
+7 1901-01-02T00:00 1901-01-01T15:00 1901-01-02
+8 1969-12-31T23:59:59.999123 1969-12-31T23:59:59.999123
1969-12-31
+9 \N \N \N
+
+-- !binlog_insert --
+11 0001-01-01T00:00:00.000123 0001-01-01T00:00:00.000123
0001-01-01
+12 0500-06-15T10:00 0500-06-15T15:00 0500-06-15
+13 1582-10-04T12:34:56 1582-10-04T12:34:56 1582-10-04
+14 1582-10-15T00:00 1582-10-15T00:00 1582-10-15
+15 1800-07-20T03:30 1800-07-19T22:00 1800-07-20
+16 1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31
+17 1901-01-02T00:00 1901-01-01T15:00 1901-01-02
+18 1969-12-31T23:59:59.999123 1969-12-31T23:59:59.999123
1969-12-31
+19 \N \N \N
+
+-- !binlog_update --
+1 1582-10-15T12:00:00.000123 1582-10-15T12:00:00.000123
1900-12-31
+
+-- !binlog_after_delete --
+1 1582-10-15T12:00:00.000123 1582-10-15T12:00:00.000123
1900-12-31
+3 1582-10-04T12:34:56 1582-10-04T12:34:56 1582-10-04
+4 1582-10-15T00:00 1582-10-15T00:00 1582-10-15
+5 1800-07-20T03:30 1800-07-19T22:00 1800-07-20
+6 1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31
+7 1901-01-02T00:00 1901-01-01T15:00 1901-01-02
+8 1969-12-31T23:59:59.999123 1969-12-31T23:59:59.999123
1969-12-31
+9 \N \N \N
+11 0001-01-01T00:00:00.000123 0001-01-01T00:00:00.000123
0001-01-01
+12 0500-06-15T10:00 0500-06-15T15:00 0500-06-15
+13 1582-10-04T12:34:56 1582-10-04T12:34:56 1582-10-04
+14 1582-10-15T00:00 1582-10-15T00:00 1582-10-15
+15 1800-07-20T03:30 1800-07-19T22:00 1800-07-20
+16 1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31
+17 1901-01-02T00:00 1901-01-01T15:00 1901-01-02
+18 1969-12-31T23:59:59.999123 1969-12-31T23:59:59.999123
1969-12-31
+19 \N \N \N
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.groovy
new file mode 100644
index 00000000000..db8b748abbd
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.groovy
@@ -0,0 +1,229 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Verify snapshot and binlog paths produce identical values for historical
+ * dates that previously drifted in the snapshot path (PG JDBC's
+ * GregorianCalendar + JVM-zone LMT).
+ *
+ * Phases:
+ * 1. snapshot batch (ids 1..N) inserted in postgres before the job starts;
+ * after sync, assert values in doris match the original input.
+ * 2. binlog INSERT batch (ids 11..10+N) with the same boundary values;
+ * assert each binlog row equals its snapshot counterpart cell-for-cell.
+ * 3. binlog UPDATE: rewrite id=1's columns to a different boundary value
+ * and assert the streamed change lands.
+ * 4. binlog DELETE: remove id=2 and assert it disappears in doris.
+ */
+suite("test_streaming_postgres_job_snapshot_historical_dates",
+ "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_snapshot_historical_dates_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_pg_historical_dates"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ return
+ }
+
+ String pg_port = context.config.otherConfigs.get("pg_14_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // Boundary rows. Picked to exercise:
+ // - Julian/Gregorian cutover (0001-01-01 ⇒ 2-day drift, 1582-10-04/15
boundary)
+ // - negative tz offset before the cutover (0500-06-15 with -05)
+ // - half-hour tz offset pre-1900 (1800-07-20 with +05:30)
+ // - LMT offset for pre-1901 values in zones like Asia/Shanghai
(1900-12-31 vs 1901-01-02)
+ // - sub-millisecond precision on a pre-1970 value (negative micros, bug
B)
+ // - NULL across all three columns
+ def boundaryRows = [
+ [ts: "0001-01-01 00:00:00.000123", tstz: "0001-01-01
00:00:00.000123+00", date: "0001-01-01"],
+ [ts: "0500-06-15 10:00:00.000000", tstz: "0500-06-15 10:00:00-05",
date: "0500-06-15"],
+ [ts: "1582-10-04 12:34:56.000000", tstz: "1582-10-04 12:34:56+00",
date: "1582-10-04"],
+ [ts: "1582-10-15 00:00:00.000000", tstz: "1582-10-15 00:00:00+00",
date: "1582-10-15"],
+ [ts: "1800-07-20 03:30:00.000000", tstz: "1800-07-20
03:30:00+05:30", date: "1800-07-20"],
+ [ts: "1900-12-31 23:59:59.999000", tstz: "1900-12-31
23:59:59.999+00", date: "1900-12-31"],
+ [ts: "1901-01-02 00:00:00.000000", tstz: "1901-01-02 00:00:00+09",
date: "1901-01-02"],
+ [ts: "1969-12-31 23:59:59.999123", tstz: "1969-12-31
23:59:59.999123+00", date: "1969-12-31"],
+ [ts: null, tstz: null,
date: null],
+ ]
+ def rowsPerBatch = boundaryRows.size()
+ def snapshotIdBase = 1
+ def binlogIdBase = 11
+
+ def buildInsertValues = { int idBase ->
+ boundaryRows.withIndex().collect { row, i ->
+ def id = idBase + i
+ def tsLit = row.ts == null ? "NULL" : "TIMESTAMP '${row.ts}'"
+ def tstzLit = row.tstz == null ? "NULL" : "TIMESTAMPTZ
'${row.tstz}'"
+ def dateLit = row.date == null ? "NULL" : "DATE '${row.date}'"
+ "(${id}, ${tsLit}, ${tstzLit}, ${dateLit})"
+ }.join(",\n ")
+ }
+
+ def dumpJobOnFailure = {
+ log.info("show job: " + sql("""select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("show task: " + sql("""select * from tasks("type"="insert")
where JobName='${jobName}'"""))
+ }
+
+ // ── postgres setup + snapshot batch
───────────────────────────────────────
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """
+ CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+ id bigint PRIMARY KEY,
+ ts_col timestamp(6),
+ tstz_col timestamp(6) with time zone,
+ date_col date
+ )
+ """
+ sql """
+ INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES
+ ${buildInsertValues(snapshotIdBase)}
+ """
+ }
+
+ // ── start streaming job (offset=initial ⇒ snapshot + binlog)
──────────────
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=UTC",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // ── phase 1: snapshot
─────────────────────────────────────────────────────
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until
{
+ def cnt = sql """SELECT count(1) FROM ${currentDb}.${table1}"""
+ log.info("snapshot row count: ${cnt}")
+ cnt.size() == 1 && (cnt.get(0).get(0) as long) == (long)
rowsPerBatch
+ }
+ } catch (Exception ex) {
+ dumpJobOnFailure()
+ throw ex
+ }
+
+ qt_snapshot """SELECT id, ts_col, tstz_col, date_col FROM
${currentDb}.${table1} ORDER BY id"""
+
+ // ── phase 2: binlog INSERT
────────────────────────────────────────────────
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """
+ INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES
+ ${buildInsertValues(binlogIdBase)}
+ """
+ }
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until
{
+ def cnt = sql """SELECT count(1) FROM ${currentDb}.${table1}"""
+ cnt.size() == 1 && (cnt.get(0).get(0) as long) == (long)
(rowsPerBatch * 2)
+ }
+ } catch (Exception ex) {
+ dumpJobOnFailure()
+ throw ex
+ }
+
+ qt_binlog_insert """SELECT id, ts_col, tstz_col, date_col FROM
${currentDb}.${table1}
+ WHERE id >= ${binlogIdBase} ORDER BY id"""
+
+ // Parity: snapshot row i must equal binlog row i+10 cell-for-cell.
+ def snapshotRows = sql """SELECT ts_col, tstz_col, date_col FROM
${currentDb}.${table1}
+ WHERE id < ${binlogIdBase} ORDER BY id"""
+ def binlogRows = sql """SELECT ts_col, tstz_col, date_col FROM
${currentDb}.${table1}
+ WHERE id >= ${binlogIdBase} ORDER BY id"""
+ assert snapshotRows.size() == rowsPerBatch
+ assert binlogRows.size() == rowsPerBatch
+ for (int i = 0; i < rowsPerBatch; i++) {
+ def s = snapshotRows.get(i)
+ def b = binlogRows.get(i)
+ assert s.get(0)?.toString() == b.get(0)?.toString() :
+ "ts_col mismatch at row ${i}: snapshot=${s.get(0)}
binlog=${b.get(0)}"
+ assert s.get(1)?.toString() == b.get(1)?.toString() :
+ "tstz_col mismatch at row ${i}: snapshot=${s.get(1)}
binlog=${b.get(1)}"
+ assert s.get(2)?.toString() == b.get(2)?.toString() :
+ "date_col mismatch at row ${i}: snapshot=${s.get(2)}
binlog=${b.get(2)}"
+ }
+
+ // ── phase 3: binlog UPDATE
────────────────────────────────────────────────
+ // Rewrite id=1 (originally 0001-01-01) to a different boundary value via
UPDATE.
+ def updatedTs = "1582-10-15 12:00:00.000123"
+ def updatedTstz = "1582-10-15 12:00:00.000123+00"
+ def updatedDate = "1900-12-31"
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """
+ UPDATE ${pgDB}.${pgSchema}.${table1}
+ SET ts_col = TIMESTAMP '${updatedTs}',
+ tstz_col = TIMESTAMPTZ '${updatedTstz}',
+ date_col = DATE '${updatedDate}'
+ WHERE id = 1
+ """
+ }
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until
{
+ def row = sql """SELECT cast(date_col as string) FROM
${currentDb}.${table1} WHERE id = 1"""
+ row.size() == 1 && row.get(0).get(0) == updatedDate
+ }
+ } catch (Exception ex) {
+ dumpJobOnFailure()
+ throw ex
+ }
+ qt_binlog_update """SELECT id, ts_col, tstz_col, date_col FROM
${currentDb}.${table1}
+ WHERE id = 1"""
+
+ // ── phase 4: binlog DELETE
────────────────────────────────────────────────
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE id = 2"""
+ }
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until
{
+ def cnt = sql """SELECT count(1) FROM ${currentDb}.${table1} WHERE
id = 2"""
+ cnt.size() == 1 && (cnt.get(0).get(0) as long) == 0L
+ }
+ } catch (Exception ex) {
+ dumpJobOnFailure()
+ throw ex
+ }
+ qt_binlog_after_delete """SELECT id, ts_col, tstz_col, date_col FROM
${currentDb}.${table1}
+ ORDER BY id"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]