This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 33b2d8b1c [INLONG-4339][Sort] Rollback debezium-core to 1.5.4 (#4340)
33b2d8b1c is described below

commit 33b2d8b1c5c6af1ad090da11e0c716e9d914bf96
Author: pacino <[email protected]>
AuthorDate: Tue May 24 15:31:46 2022 +0800

    [INLONG-4339][Sort] Rollback debezium-core to 1.5.4 (#4340)
---
 inlong-sort/pom.xml                                |  2 +-
 inlong-sort/sort-connectors/jdbc/pom.xml           |  4 --
 .../debezium/internal/DebeziumChangeConsumer.java  |  1 +
 .../internal/EmbeddedEngineChangeEvent.java        | 68 ------------------
 .../internal/FlinkDatabaseSchemaHistory.java       |  2 +-
 .../debezium/EmbeddedFlinkDatabaseHistory.java     |  2 +-
 .../debezium/dispatcher/EventDispatcherImpl.java   |  4 +-
 .../mysql/debezium/reader/BinlogSplitReader.java   |  3 +-
 .../mysql/debezium/reader/SnapshotSplitReader.java | 11 +--
 .../debezium/task/MySqlBinlogSplitReadTask.java    | 14 ++--
 .../debezium/task/MySqlSnapshotSplitReadTask.java  | 81 ++++++++++------------
 inlong-sort/sort-connectors/pom.xml                | 15 ++--
 licenses/inlong-sort/LICENSE                       |  8 +--
 13 files changed, 61 insertions(+), 154 deletions(-)

diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index 8b816fec0..9637cc55d 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -42,7 +42,7 @@
         <module>sort-dist</module>
     </modules>
     <properties>
-        <debezium.version>1.6.4.Final</debezium.version>
+        <debezium.version>1.5.4.Final</debezium.version>
         <kafka.clients.version>2.7.0</kafka.clients.version>
     </properties>
     <dependencyManagement>
diff --git a/inlong-sort/sort-connectors/jdbc/pom.xml 
b/inlong-sort/sort-connectors/jdbc/pom.xml
index 799b4b679..d88b6fbdb 100644
--- a/inlong-sort/sort-connectors/jdbc/pom.xml
+++ b/inlong-sort/sort-connectors/jdbc/pom.xml
@@ -39,10 +39,6 @@
             <artifactId>clickhouse-jdbc</artifactId>
         </dependency>
         <!--for postgresql-->
-        <dependency>
-            <groupId>com.ververica</groupId>
-            <artifactId>flink-connector-postgres-cdc</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.postgresql</groupId>
             <artifactId>postgresql</artifactId>
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeConsumer.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeConsumer.java
index 734386c88..2c8441e0c 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeConsumer.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeConsumer.java
@@ -18,6 +18,7 @@
 
 package org.apache.inlong.sort.cdc.debezium.internal;
 
+import io.debezium.embedded.EmbeddedEngineChangeEvent;
 import io.debezium.engine.ChangeEvent;
 import io.debezium.engine.DebeziumEngine;
 import io.debezium.engine.DebeziumEngine.RecordCommitter;
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/EmbeddedEngineChangeEvent.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/EmbeddedEngineChangeEvent.java
deleted file mode 100644
index 43baea8d8..000000000
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/EmbeddedEngineChangeEvent.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.cdc.debezium.internal;
-
-import io.debezium.engine.ChangeEvent;
-import io.debezium.engine.RecordChangeEvent;
-import org.apache.kafka.connect.source.SourceRecord;
-
-/**
- * Copied from Debezium project. Make it public to be accessible from 
DebeziumChangeFetcher.
- */
-public class EmbeddedEngineChangeEvent<K, V> implements ChangeEvent<K, V>, 
RecordChangeEvent<V> {
-
-    private final K key;
-    private final V value;
-    private final SourceRecord sourceRecord;
-
-    public EmbeddedEngineChangeEvent(K key, V value, SourceRecord 
sourceRecord) {
-        this.key = key;
-        this.value = value;
-        this.sourceRecord = sourceRecord;
-    }
-
-    @Override
-    public K key() {
-        return key;
-    }
-
-    @Override
-    public V value() {
-        return value;
-    }
-
-    @Override
-    public V record() {
-        return value;
-    }
-
-    @Override
-    public String destination() {
-        return sourceRecord.topic();
-    }
-
-    public SourceRecord sourceRecord() {
-        return sourceRecord;
-    }
-
-    @Override
-    public String toString() {
-        return "EmbeddedEngineChangeEvent [key=" + key + ", value=" + value + 
", sourceRecord=" + sourceRecord + "]";
-    }
-}
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java
index 06142d6e5..a8c60f155 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java
@@ -189,7 +189,7 @@ public class FlinkDatabaseSchemaHistory implements 
DatabaseHistory {
     }
 
     @Override
-    public boolean storeOnlyCapturedTables() {
+    public boolean storeOnlyMonitoredTables() {
         return storeOnlyMonitoredTablesDdl;
     }
 
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/EmbeddedFlinkDatabaseHistory.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/EmbeddedFlinkDatabaseHistory.java
index e4041e9a2..c82b73be6 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/EmbeddedFlinkDatabaseHistory.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/EmbeddedFlinkDatabaseHistory.java
@@ -142,7 +142,7 @@ public class EmbeddedFlinkDatabaseHistory implements 
DatabaseHistory {
     }
 
     @Override
-    public boolean storeOnlyCapturedTables() {
+    public boolean storeOnlyMonitoredTables() {
         return storeOnlyMonitoredTablesDdl;
     }
 
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/dispatcher/EventDispatcherImpl.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/dispatcher/EventDispatcherImpl.java
index e9841fd97..567e9117e 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/dispatcher/EventDispatcherImpl.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/dispatcher/EventDispatcherImpl.java
@@ -134,7 +134,7 @@ public class EventDispatcherImpl<T extends 
DataCollectionId> extends EventDispat
             T dataCollectionId, SchemaChangeEventEmitter 
schemaChangeEventEmitter)
             throws InterruptedException {
         if (dataCollectionId != null && !filter.isIncluded(dataCollectionId)) {
-            if (historizedSchema == null || 
historizedSchema.storeOnlyCapturedTables()) {
+            if (historizedSchema == null || 
historizedSchema.storeOnlyMonitoredTables()) {
                 LOG.trace("Filtering schema change event for {}", 
dataCollectionId);
                 return;
             }
@@ -158,7 +158,7 @@ public class EventDispatcherImpl<T extends 
DataCollectionId> extends EventDispat
             }
         }
         if (!anyNonfilteredEvent) {
-            if (historizedSchema == null || 
historizedSchema.storeOnlyCapturedTables()) {
+            if (historizedSchema == null || 
historizedSchema.storeOnlyMonitoredTables()) {
                 LOG.trace("Filtering schema change event for {}", 
dataCollectionIds);
                 return;
             }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
index eae13212a..8b9d46a71 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
@@ -110,8 +110,7 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecord, MySqlSpli
         executor.submit(
                 () -> {
                     try {
-                        binlogSplitReadTask.execute(new 
BinlogSplitChangeEventSourceContextImpl(),
-                                statefulTaskContext.getOffsetContext());
+                        binlogSplitReadTask.execute(new 
BinlogSplitChangeEventSourceContextImpl());
                     } catch (Exception e) {
                         currentTaskRunning = false;
                         LOG.error(
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
index 93a48c102..f85a7658a 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
@@ -110,8 +110,7 @@ public class SnapshotSplitReader implements 
DebeziumReader<SourceRecord, MySqlSp
                         final SnapshotSplitChangeEventSourceContextImpl 
sourceContext =
                                 new 
SnapshotSplitChangeEventSourceContextImpl();
                         SnapshotResult snapshotResult =
-                                splitSnapshotReadTask.execute(sourceContext,
-                                        
statefulTaskContext.getOffsetContext());
+                                splitSnapshotReadTask.execute(sourceContext);
                         final MySqlBinlogSplit backfillBinlogSplit =
                                 createBackfillBinlogSplit(sourceContext);
                         // optimization that skip the binlog read when the low 
watermark equals high
@@ -130,14 +129,8 @@ public class SnapshotSplitReader implements 
DebeziumReader<SourceRecord, MySqlSp
                         if (snapshotResult.isCompletedOrSkipped()) {
                             final MySqlBinlogSplitReadTask 
backfillBinlogReadTask =
                                     
createBackfillBinlogReadTask(backfillBinlogSplit);
-                            final MySqlOffsetContext.Loader loader =
-                                    new MySqlOffsetContext.Loader(
-                                            
statefulTaskContext.getConnectorConfig());
-                            final MySqlOffsetContext mySqlOffsetContext =
-                                    loader.load(
-                                            
backfillBinlogSplit.getStartingOffset().getOffset());
                             backfillBinlogReadTask.execute(
-                                    new 
SnapshotBinlogSplitChangeEventSourceContextImpl(), mySqlOffsetContext);
+                                    new 
SnapshotBinlogSplitChangeEventSourceContextImpl());
                         } else {
                             readException =
                                     new IllegalStateException(
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
index 41bfe039b..d3b482cae 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
@@ -31,7 +31,7 @@ import io.debezium.relational.TableId;
 import io.debezium.util.Clock;
 import 
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.EventDispatcherImpl;
 import 
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.SignalEventDispatcher;
-import org.apache.inlong.sort.cdc.mysql.debezium.reader.SnapshotSplitReader;
+import 
org.apache.inlong.sort.cdc.mysql.debezium.reader.SnapshotSplitReader.SnapshotBinlogSplitChangeEventSourceContextImpl;
 import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit;
 import org.slf4j.Logger;
@@ -70,6 +70,7 @@ public class MySqlBinlogSplitReadTask extends 
MySqlStreamingChangeEventSource {
             MySqlBinlogSplit binlogSplit) {
         super(
                 connectorConfig,
+                offsetContext,
                 connection,
                 dispatcher,
                 errorHandler,
@@ -86,15 +87,14 @@ public class MySqlBinlogSplitReadTask extends 
MySqlStreamingChangeEventSource {
     }
 
     @Override
-    public void execute(ChangeEventSourceContext context, MySqlOffsetContext 
offsetContext)
-            throws InterruptedException {
+    public void execute(ChangeEventSourceContext context) throws 
InterruptedException {
         this.context = context;
-        super.execute(context, offsetContext);
+        super.execute(context);
     }
 
     @Override
-    protected void handleEvent(MySqlOffsetContext offsetContext, Event event) {
-        super.handleEvent(offsetContext, event);
+    protected void handleEvent(Event event) {
+        super.handleEvent(event);
         // check do we need to stop for read binlog for snapshot split.
         if (isBoundedRead()) {
             final BinlogOffset currentBinlogOffset = 
getBinlogPosition(offsetContext.getOffset());
@@ -112,7 +112,7 @@ public class MySqlBinlogSplitReadTask extends 
MySqlStreamingChangeEventSource {
                             new DebeziumException("Error processing binlog 
signal event", e));
                 }
                 // tell reader the binlog task finished
-                
((SnapshotSplitReader.SnapshotBinlogSplitChangeEventSourceContextImpl) 
context).finished();
+                ((SnapshotBinlogSplitChangeEventSourceContextImpl) 
context).finished();
             }
         }
     }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
index 73e9d9966..df4a610c3 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
@@ -28,6 +28,7 @@ import io.debezium.pipeline.EventDispatcher;
 import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
 import io.debezium.pipeline.source.spi.SnapshotProgressListener;
 import io.debezium.pipeline.spi.ChangeRecordEmitter;
+import io.debezium.pipeline.spi.OffsetContext;
 import io.debezium.pipeline.spi.SnapshotResult;
 import io.debezium.relational.Column;
 import io.debezium.relational.RelationalSnapshotChangeEventSource;
@@ -60,16 +61,12 @@ import java.util.Calendar;
 
 import static 
org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.currentBinlogOffset;
 
-/**
- * Task to read snapshot split of table.
- */
-public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSource<MySqlOffsetContext> {
+/** Task to read snapshot split of table. */
+public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSource {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MySqlSnapshotSplitReadTask.class);
 
-    /**
-     * Interval for showing a log statement with the progress while scanning a 
single table.
-     */
+    /** Interval for showing a log statement with the progress while scanning 
a single table. */
     private static final Duration LOG_INTERVAL = Duration.ofMillis(10_000);
 
     private final MySqlConnectorConfig connectorConfig;
@@ -92,7 +89,7 @@ public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSourc
             TopicSelector<TableId> topicSelector,
             Clock clock,
             MySqlSnapshotSplit snapshotSplit) {
-        super(connectorConfig, snapshotProgressListener);
+        super(connectorConfig, previousOffset, snapshotProgressListener);
         this.offsetContext = previousOffset;
         this.connectorConfig = connectorConfig;
         this.databaseSchema = databaseSchema;
@@ -105,8 +102,7 @@ public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSourc
     }
 
     @Override
-    public SnapshotResult<MySqlOffsetContext> execute(ChangeEventSourceContext 
context,
-                                                      MySqlOffsetContext 
previousOffset) throws InterruptedException {
+    public SnapshotResult execute(ChangeEventSourceContext context) throws 
InterruptedException {
         SnapshottingTask snapshottingTask = 
getSnapshottingTask(previousOffset);
         final SnapshotContext ctx;
         try {
@@ -116,7 +112,7 @@ public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSourc
             throw new RuntimeException(e);
         }
         try {
-            return doExecute(context, previousOffset, ctx, snapshottingTask);
+            return doExecute(context, ctx, snapshottingTask);
         } catch (InterruptedException e) {
             LOG.warn("Snapshot was interrupted before completion");
             throw e;
@@ -125,21 +121,18 @@ public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSourc
         }
     }
 
-    protected SnapshotResult<MySqlOffsetContext> doExecute(
+    @Override
+    protected SnapshotResult doExecute(
             ChangeEventSourceContext context,
-            MySqlOffsetContext previousOffset,
-            SnapshotContext<MySqlOffsetContext> snapshotContext,
+            SnapshotContext snapshotContext,
             SnapshottingTask snapshottingTask)
             throws Exception {
-        final 
RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlOffsetContext>
-                ctx =
-                (RelationalSnapshotChangeEventSource.RelationalSnapshotContext<
-                        MySqlOffsetContext>)
-                        snapshotContext;
-        ctx.offset = previousOffset;
-        SignalEventDispatcher signalEventDispatcher =
+        final RelationalSnapshotChangeEventSource.RelationalSnapshotContext 
ctx =
+                
(RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;
+        ctx.offset = offsetContext;
+        final SignalEventDispatcher signalEventDispatcher =
                 new SignalEventDispatcher(
-                        previousOffset.getPartition(),
+                        offsetContext.getPartition(),
                         topicSelector.topicNameFor(snapshotSplit.getTableId()),
                         dispatcher.getQueue());
 
@@ -170,7 +163,7 @@ public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSourc
     }
 
     @Override
-    protected SnapshottingTask getSnapshottingTask(MySqlOffsetContext 
mySqlOffsetContext) {
+    protected SnapshottingTask getSnapshottingTask(OffsetContext 
previousOffset) {
         return new SnapshottingTask(false, true);
     }
 
@@ -180,6 +173,14 @@ public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSourc
         return new MySqlSnapshotContext();
     }
 
+    private static class MySqlSnapshotContext
+            extends 
RelationalSnapshotChangeEventSource.RelationalSnapshotContext {
+
+        public MySqlSnapshotContext() throws SQLException {
+            super("");
+        }
+    }
+
     private void createDataEvents(
             RelationalSnapshotChangeEventSource.RelationalSnapshotContext 
snapshotContext,
             TableId tableId)
@@ -192,9 +193,7 @@ public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSourc
         snapshotReceiver.completeSnapshot();
     }
 
-    /**
-     * Dispatches the data change events for the records of a single table.
-     */
+    /** Dispatches the data change events for the records of a single table. */
     private void createDataEventsForTable(
             RelationalSnapshotChangeEventSource.RelationalSnapshotContext 
snapshotContext,
             EventDispatcher.SnapshotReceiver snapshotReceiver,
@@ -217,16 +216,16 @@ public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSourc
                 selectSql);
 
         try (PreparedStatement selectStatement =
-                     StatementUtils.readTableSplitDataStatement(
-                             jdbcConnection,
-                             selectSql,
-                             snapshotSplit.getSplitStart() == null,
-                             snapshotSplit.getSplitEnd() == null,
-                             snapshotSplit.getSplitStart(),
-                             snapshotSplit.getSplitEnd(),
-                             snapshotSplit.getSplitKeyType().getFieldCount(),
-                             connectorConfig.getQueryFetchSize());
-             ResultSet rs = selectStatement.executeQuery()) {
+                        StatementUtils.readTableSplitDataStatement(
+                                jdbcConnection,
+                                selectSql,
+                                snapshotSplit.getSplitStart() == null,
+                                snapshotSplit.getSplitEnd() == null,
+                                snapshotSplit.getSplitStart(),
+                                snapshotSplit.getSplitEnd(),
+                                
snapshotSplit.getSplitKeyType().getFieldCount(),
+                                connectorConfig.getQueryFetchSize());
+                ResultSet rs = selectStatement.executeQuery()) {
 
             ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, 
table);
             long rows = 0;
@@ -353,7 +352,7 @@ public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSourc
 
         try {
             return MySqlValueConverters.containsZeroValuesInDatePart(
-                    (new String(b.getBytes(1, (int) (b.length())), "UTF-8")), 
column, table)
+                            (new String(b.getBytes(1, (int) (b.length())), 
"UTF-8")), column, table)
                     ? null
                     : rs.getTimestamp(fieldNo, Calendar.getInstance());
         } catch (UnsupportedEncodingException e) {
@@ -361,12 +360,4 @@ public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSourc
             throw new RuntimeException(e);
         }
     }
-
-    private static class MySqlSnapshotContext
-            extends 
RelationalSnapshotChangeEventSource.RelationalSnapshotContext {
-
-        public MySqlSnapshotContext() throws SQLException {
-            super("");
-        }
-    }
 }
diff --git a/inlong-sort/sort-connectors/pom.xml 
b/inlong-sort/sort-connectors/pom.xml
index 20771736f..60341554e 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -98,20 +98,15 @@
             <groupId>org.apache.flink</groupId>
             
<artifactId>flink-connector-hbase-2.2_${flink.scala.binary.version}</artifactId>
         </dependency>
+        <!--for postgres cdc -->
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-connector-postgres-cdc</artifactId>
+        </dependency>
         <!--for mongodb-->
         <dependency>
             <groupId>com.ververica</groupId>
             <artifactId>flink-connector-mongodb-cdc</artifactId>
-            <exclusions>
-                <exclusion>
-                    <artifactId>debezium-api</artifactId>
-                    <groupId>io.debezium</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>debezium-embedded</artifactId>
-                    <groupId>io.debezium</groupId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <!--for debezium-->
         <dependency>
diff --git a/licenses/inlong-sort/LICENSE b/licenses/inlong-sort/LICENSE
index 6ffdc4467..56dab07db 100644
--- a/licenses/inlong-sort/LICENSE
+++ b/licenses/inlong-sort/LICENSE
@@ -670,10 +670,10 @@ The text of each license is the standard Apache 2.0 
license.
   org.apache.zookeeper:zookeeper:3.6.3 - Apache ZooKeeper - Server 
(https://github.com/apache/zookeeper/tree/release-3.6.3/zookeeper-server), 
(Apache License, Version 2.0)
   org.apache.zookeeper:zookeeper-jute:3.6.3 - Apache ZooKeeper - Jute 
(https://github.com/apache/zookeeper/tree/release-3.6.3/zookeeper-jute), 
(Apache License, Version 2.0)
   io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9 - 
StreamNative :: Pulsar Flink Connector :: Scala 2.11 
(https://github.com/streamnative/pulsar-flink/blob/release-1.13/LICENSE), 
(Apache License, Version 2.0)
-  io.debezium:debezium-connector-sqlserver:1.6.4.Final - Apache debezium 
(https://github.com/debezium/debezium/tree/v1.6.4.Final/debezium-connector-sqlserver),
 (Apache License, Version 2.0)
-  io.debezium:debezium-core:1.6.4.Final - Apache debezium 
(https://github.com/debezium/debezium/tree/v1.6.4.Final/debezium-core), (Apache 
License, Version 2.0) 
-  io.debezium:debezium-embedded:1.6.4.Final - Apache debezium 
(https://github.com/debezium/debezium/tree/v1.6.4.Final/debezium-embedded), 
(Apache License, Version 2.0) 
-  io.debezium:debezium-api:1.6.4.Final - Apache debezium 
(https://github.com/debezium/debezium/tree/v1.6.4.Final/debezium-api), (Apache 
License, Version 2.0) 
+  io.debezium:debezium-connector-sqlserver:1.5.4.Final - Apache debezium 
(https://github.com/debezium/debezium/tree/v1.5.4.Final/debezium-connector-sqlserver),
 (Apache License, Version 2.0)
+  io.debezium:debezium-core:1.5.4.Final - Apache debezium 
(https://github.com/debezium/debezium/tree/v1.5.4.Final/debezium-core), (Apache 
License, Version 2.0)
+  io.debezium:debezium-embedded:1.5.4.Final - Apache debezium 
(https://github.com/debezium/debezium/tree/v1.5.4.Final/debezium-embedded), 
(Apache License, Version 2.0)
+  io.debezium:debezium-api:1.5.4.Final - Apache debezium 
(https://github.com/debezium/debezium/tree/v1.5.4.Final/debezium-api), (Apache 
License, Version 2.0)
 
 
 ========================================================================

Reply via email to