This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 33b2d8b1c [INLONG-4339][Sort] Rollback debezium-core to 1.5.4 (#4340)
33b2d8b1c is described below
commit 33b2d8b1c5c6af1ad090da11e0c716e9d914bf96
Author: pacino <[email protected]>
AuthorDate: Tue May 24 15:31:46 2022 +0800
[INLONG-4339][Sort] Rollback debezium-core to 1.5.4 (#4340)
---
inlong-sort/pom.xml | 2 +-
inlong-sort/sort-connectors/jdbc/pom.xml | 4 --
.../debezium/internal/DebeziumChangeConsumer.java | 1 +
.../internal/EmbeddedEngineChangeEvent.java | 68 ------------------
.../internal/FlinkDatabaseSchemaHistory.java | 2 +-
.../debezium/EmbeddedFlinkDatabaseHistory.java | 2 +-
.../debezium/dispatcher/EventDispatcherImpl.java | 4 +-
.../mysql/debezium/reader/BinlogSplitReader.java | 3 +-
.../mysql/debezium/reader/SnapshotSplitReader.java | 11 +--
.../debezium/task/MySqlBinlogSplitReadTask.java | 14 ++--
.../debezium/task/MySqlSnapshotSplitReadTask.java | 81 ++++++++++------------
inlong-sort/sort-connectors/pom.xml | 15 ++--
licenses/inlong-sort/LICENSE | 8 +--
13 files changed, 61 insertions(+), 154 deletions(-)
diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index 8b816fec0..9637cc55d 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -42,7 +42,7 @@
<module>sort-dist</module>
</modules>
<properties>
- <debezium.version>1.6.4.Final</debezium.version>
+ <debezium.version>1.5.4.Final</debezium.version>
<kafka.clients.version>2.7.0</kafka.clients.version>
</properties>
<dependencyManagement>
diff --git a/inlong-sort/sort-connectors/jdbc/pom.xml
b/inlong-sort/sort-connectors/jdbc/pom.xml
index 799b4b679..d88b6fbdb 100644
--- a/inlong-sort/sort-connectors/jdbc/pom.xml
+++ b/inlong-sort/sort-connectors/jdbc/pom.xml
@@ -39,10 +39,6 @@
<artifactId>clickhouse-jdbc</artifactId>
</dependency>
<!--for postgresql-->
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-postgres-cdc</artifactId>
- </dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeConsumer.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeConsumer.java
index 734386c88..2c8441e0c 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeConsumer.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeConsumer.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.cdc.debezium.internal;
+import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/EmbeddedEngineChangeEvent.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/EmbeddedEngineChangeEvent.java
deleted file mode 100644
index 43baea8d8..000000000
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/EmbeddedEngineChangeEvent.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.cdc.debezium.internal;
-
-import io.debezium.engine.ChangeEvent;
-import io.debezium.engine.RecordChangeEvent;
-import org.apache.kafka.connect.source.SourceRecord;
-
-/**
- * Copied from Debezium project. Make it public to be accessible from
DebeziumChangeFetcher.
- */
-public class EmbeddedEngineChangeEvent<K, V> implements ChangeEvent<K, V>,
RecordChangeEvent<V> {
-
- private final K key;
- private final V value;
- private final SourceRecord sourceRecord;
-
- public EmbeddedEngineChangeEvent(K key, V value, SourceRecord
sourceRecord) {
- this.key = key;
- this.value = value;
- this.sourceRecord = sourceRecord;
- }
-
- @Override
- public K key() {
- return key;
- }
-
- @Override
- public V value() {
- return value;
- }
-
- @Override
- public V record() {
- return value;
- }
-
- @Override
- public String destination() {
- return sourceRecord.topic();
- }
-
- public SourceRecord sourceRecord() {
- return sourceRecord;
- }
-
- @Override
- public String toString() {
- return "EmbeddedEngineChangeEvent [key=" + key + ", value=" + value +
", sourceRecord=" + sourceRecord + "]";
- }
-}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java
index 06142d6e5..a8c60f155 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java
@@ -189,7 +189,7 @@ public class FlinkDatabaseSchemaHistory implements
DatabaseHistory {
}
@Override
- public boolean storeOnlyCapturedTables() {
+ public boolean storeOnlyMonitoredTables() {
return storeOnlyMonitoredTablesDdl;
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/EmbeddedFlinkDatabaseHistory.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/EmbeddedFlinkDatabaseHistory.java
index e4041e9a2..c82b73be6 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/EmbeddedFlinkDatabaseHistory.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/EmbeddedFlinkDatabaseHistory.java
@@ -142,7 +142,7 @@ public class EmbeddedFlinkDatabaseHistory implements
DatabaseHistory {
}
@Override
- public boolean storeOnlyCapturedTables() {
+ public boolean storeOnlyMonitoredTables() {
return storeOnlyMonitoredTablesDdl;
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/dispatcher/EventDispatcherImpl.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/dispatcher/EventDispatcherImpl.java
index e9841fd97..567e9117e 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/dispatcher/EventDispatcherImpl.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/dispatcher/EventDispatcherImpl.java
@@ -134,7 +134,7 @@ public class EventDispatcherImpl<T extends
DataCollectionId> extends EventDispat
T dataCollectionId, SchemaChangeEventEmitter
schemaChangeEventEmitter)
throws InterruptedException {
if (dataCollectionId != null && !filter.isIncluded(dataCollectionId)) {
- if (historizedSchema == null ||
historizedSchema.storeOnlyCapturedTables()) {
+ if (historizedSchema == null ||
historizedSchema.storeOnlyMonitoredTables()) {
LOG.trace("Filtering schema change event for {}",
dataCollectionId);
return;
}
@@ -158,7 +158,7 @@ public class EventDispatcherImpl<T extends
DataCollectionId> extends EventDispat
}
}
if (!anyNonfilteredEvent) {
- if (historizedSchema == null ||
historizedSchema.storeOnlyCapturedTables()) {
+ if (historizedSchema == null ||
historizedSchema.storeOnlyMonitoredTables()) {
LOG.trace("Filtering schema change event for {}",
dataCollectionIds);
return;
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
index eae13212a..8b9d46a71 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
@@ -110,8 +110,7 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecord, MySqlSpli
executor.submit(
() -> {
try {
- binlogSplitReadTask.execute(new
BinlogSplitChangeEventSourceContextImpl(),
- statefulTaskContext.getOffsetContext());
+ binlogSplitReadTask.execute(new
BinlogSplitChangeEventSourceContextImpl());
} catch (Exception e) {
currentTaskRunning = false;
LOG.error(
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
index 93a48c102..f85a7658a 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
@@ -110,8 +110,7 @@ public class SnapshotSplitReader implements
DebeziumReader<SourceRecord, MySqlSp
final SnapshotSplitChangeEventSourceContextImpl
sourceContext =
new
SnapshotSplitChangeEventSourceContextImpl();
SnapshotResult snapshotResult =
- splitSnapshotReadTask.execute(sourceContext,
-
statefulTaskContext.getOffsetContext());
+ splitSnapshotReadTask.execute(sourceContext);
final MySqlBinlogSplit backfillBinlogSplit =
createBackfillBinlogSplit(sourceContext);
// optimization that skip the binlog read when the low
watermark equals high
@@ -130,14 +129,8 @@ public class SnapshotSplitReader implements
DebeziumReader<SourceRecord, MySqlSp
if (snapshotResult.isCompletedOrSkipped()) {
final MySqlBinlogSplitReadTask
backfillBinlogReadTask =
createBackfillBinlogReadTask(backfillBinlogSplit);
- final MySqlOffsetContext.Loader loader =
- new MySqlOffsetContext.Loader(
-
statefulTaskContext.getConnectorConfig());
- final MySqlOffsetContext mySqlOffsetContext =
- loader.load(
-
backfillBinlogSplit.getStartingOffset().getOffset());
backfillBinlogReadTask.execute(
- new
SnapshotBinlogSplitChangeEventSourceContextImpl(), mySqlOffsetContext);
+ new
SnapshotBinlogSplitChangeEventSourceContextImpl());
} else {
readException =
new IllegalStateException(
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
index 41bfe039b..d3b482cae 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
@@ -31,7 +31,7 @@ import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.EventDispatcherImpl;
import
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.SignalEventDispatcher;
-import org.apache.inlong.sort.cdc.mysql.debezium.reader.SnapshotSplitReader;
+import
org.apache.inlong.sort.cdc.mysql.debezium.reader.SnapshotSplitReader.SnapshotBinlogSplitChangeEventSourceContextImpl;
import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit;
import org.slf4j.Logger;
@@ -70,6 +70,7 @@ public class MySqlBinlogSplitReadTask extends
MySqlStreamingChangeEventSource {
MySqlBinlogSplit binlogSplit) {
super(
connectorConfig,
+ offsetContext,
connection,
dispatcher,
errorHandler,
@@ -86,15 +87,14 @@ public class MySqlBinlogSplitReadTask extends
MySqlStreamingChangeEventSource {
}
@Override
- public void execute(ChangeEventSourceContext context, MySqlOffsetContext
offsetContext)
- throws InterruptedException {
+ public void execute(ChangeEventSourceContext context) throws
InterruptedException {
this.context = context;
- super.execute(context, offsetContext);
+ super.execute(context);
}
@Override
- protected void handleEvent(MySqlOffsetContext offsetContext, Event event) {
- super.handleEvent(offsetContext, event);
+ protected void handleEvent(Event event) {
+ super.handleEvent(event);
// check do we need to stop for read binlog for snapshot split.
if (isBoundedRead()) {
final BinlogOffset currentBinlogOffset =
getBinlogPosition(offsetContext.getOffset());
@@ -112,7 +112,7 @@ public class MySqlBinlogSplitReadTask extends
MySqlStreamingChangeEventSource {
new DebeziumException("Error processing binlog
signal event", e));
}
// tell reader the binlog task finished
-
((SnapshotSplitReader.SnapshotBinlogSplitChangeEventSourceContextImpl)
context).finished();
+ ((SnapshotBinlogSplitChangeEventSourceContextImpl)
context).finished();
}
}
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
index 73e9d9966..df4a610c3 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
@@ -28,6 +28,7 @@ import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
+import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
@@ -60,16 +61,12 @@ import java.util.Calendar;
import static
org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.currentBinlogOffset;
-/**
- * Task to read snapshot split of table.
- */
-public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSource<MySqlOffsetContext> {
+/** Task to read snapshot split of table. */
+public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSource {
private static final Logger LOG =
LoggerFactory.getLogger(MySqlSnapshotSplitReadTask.class);
- /**
- * Interval for showing a log statement with the progress while scanning a
single table.
- */
+ /** Interval for showing a log statement with the progress while scanning
a single table. */
private static final Duration LOG_INTERVAL = Duration.ofMillis(10_000);
private final MySqlConnectorConfig connectorConfig;
@@ -92,7 +89,7 @@ public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSourc
TopicSelector<TableId> topicSelector,
Clock clock,
MySqlSnapshotSplit snapshotSplit) {
- super(connectorConfig, snapshotProgressListener);
+ super(connectorConfig, previousOffset, snapshotProgressListener);
this.offsetContext = previousOffset;
this.connectorConfig = connectorConfig;
this.databaseSchema = databaseSchema;
@@ -105,8 +102,7 @@ public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSourc
}
@Override
- public SnapshotResult<MySqlOffsetContext> execute(ChangeEventSourceContext
context,
- MySqlOffsetContext
previousOffset) throws InterruptedException {
+ public SnapshotResult execute(ChangeEventSourceContext context) throws
InterruptedException {
SnapshottingTask snapshottingTask =
getSnapshottingTask(previousOffset);
final SnapshotContext ctx;
try {
@@ -116,7 +112,7 @@ public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSourc
throw new RuntimeException(e);
}
try {
- return doExecute(context, previousOffset, ctx, snapshottingTask);
+ return doExecute(context, ctx, snapshottingTask);
} catch (InterruptedException e) {
LOG.warn("Snapshot was interrupted before completion");
throw e;
@@ -125,21 +121,18 @@ public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSourc
}
}
- protected SnapshotResult<MySqlOffsetContext> doExecute(
+ @Override
+ protected SnapshotResult doExecute(
ChangeEventSourceContext context,
- MySqlOffsetContext previousOffset,
- SnapshotContext<MySqlOffsetContext> snapshotContext,
+ SnapshotContext snapshotContext,
SnapshottingTask snapshottingTask)
throws Exception {
- final
RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlOffsetContext>
- ctx =
- (RelationalSnapshotChangeEventSource.RelationalSnapshotContext<
- MySqlOffsetContext>)
- snapshotContext;
- ctx.offset = previousOffset;
- SignalEventDispatcher signalEventDispatcher =
+ final RelationalSnapshotChangeEventSource.RelationalSnapshotContext
ctx =
+
(RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;
+ ctx.offset = offsetContext;
+ final SignalEventDispatcher signalEventDispatcher =
new SignalEventDispatcher(
- previousOffset.getPartition(),
+ offsetContext.getPartition(),
topicSelector.topicNameFor(snapshotSplit.getTableId()),
dispatcher.getQueue());
@@ -170,7 +163,7 @@ public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSourc
}
@Override
- protected SnapshottingTask getSnapshottingTask(MySqlOffsetContext
mySqlOffsetContext) {
+ protected SnapshottingTask getSnapshottingTask(OffsetContext
previousOffset) {
return new SnapshottingTask(false, true);
}
@@ -180,6 +173,14 @@ public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSourc
return new MySqlSnapshotContext();
}
+ private static class MySqlSnapshotContext
+ extends
RelationalSnapshotChangeEventSource.RelationalSnapshotContext {
+
+ public MySqlSnapshotContext() throws SQLException {
+ super("");
+ }
+ }
+
private void createDataEvents(
RelationalSnapshotChangeEventSource.RelationalSnapshotContext
snapshotContext,
TableId tableId)
@@ -192,9 +193,7 @@ public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSourc
snapshotReceiver.completeSnapshot();
}
- /**
- * Dispatches the data change events for the records of a single table.
- */
+ /** Dispatches the data change events for the records of a single table. */
private void createDataEventsForTable(
RelationalSnapshotChangeEventSource.RelationalSnapshotContext
snapshotContext,
EventDispatcher.SnapshotReceiver snapshotReceiver,
@@ -217,16 +216,16 @@ public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSourc
selectSql);
try (PreparedStatement selectStatement =
- StatementUtils.readTableSplitDataStatement(
- jdbcConnection,
- selectSql,
- snapshotSplit.getSplitStart() == null,
- snapshotSplit.getSplitEnd() == null,
- snapshotSplit.getSplitStart(),
- snapshotSplit.getSplitEnd(),
- snapshotSplit.getSplitKeyType().getFieldCount(),
- connectorConfig.getQueryFetchSize());
- ResultSet rs = selectStatement.executeQuery()) {
+ StatementUtils.readTableSplitDataStatement(
+ jdbcConnection,
+ selectSql,
+ snapshotSplit.getSplitStart() == null,
+ snapshotSplit.getSplitEnd() == null,
+ snapshotSplit.getSplitStart(),
+ snapshotSplit.getSplitEnd(),
+
snapshotSplit.getSplitKeyType().getFieldCount(),
+ connectorConfig.getQueryFetchSize());
+ ResultSet rs = selectStatement.executeQuery()) {
ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs,
table);
long rows = 0;
@@ -353,7 +352,7 @@ public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSourc
try {
return MySqlValueConverters.containsZeroValuesInDatePart(
- (new String(b.getBytes(1, (int) (b.length())), "UTF-8")),
column, table)
+ (new String(b.getBytes(1, (int) (b.length())),
"UTF-8")), column, table)
? null
: rs.getTimestamp(fieldNo, Calendar.getInstance());
} catch (UnsupportedEncodingException e) {
@@ -361,12 +360,4 @@ public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSourc
throw new RuntimeException(e);
}
}
-
- private static class MySqlSnapshotContext
- extends
RelationalSnapshotChangeEventSource.RelationalSnapshotContext {
-
- public MySqlSnapshotContext() throws SQLException {
- super("");
- }
- }
}
diff --git a/inlong-sort/sort-connectors/pom.xml
b/inlong-sort/sort-connectors/pom.xml
index 20771736f..60341554e 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -98,20 +98,15 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_${flink.scala.binary.version}</artifactId>
</dependency>
+ <!--for postgres cdc -->
+ <dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>flink-connector-postgres-cdc</artifactId>
+ </dependency>
<!--for mongodb-->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
- <exclusions>
- <exclusion>
- <artifactId>debezium-api</artifactId>
- <groupId>io.debezium</groupId>
- </exclusion>
- <exclusion>
- <artifactId>debezium-embedded</artifactId>
- <groupId>io.debezium</groupId>
- </exclusion>
- </exclusions>
</dependency>
<!--for debezium-->
<dependency>
diff --git a/licenses/inlong-sort/LICENSE b/licenses/inlong-sort/LICENSE
index 6ffdc4467..56dab07db 100644
--- a/licenses/inlong-sort/LICENSE
+++ b/licenses/inlong-sort/LICENSE
@@ -670,10 +670,10 @@ The text of each license is the standard Apache 2.0
license.
org.apache.zookeeper:zookeeper:3.6.3 - Apache ZooKeeper - Server
(https://github.com/apache/zookeeper/tree/release-3.6.3/zookeeper-server),
(Apache License, Version 2.0)
org.apache.zookeeper:zookeeper-jute:3.6.3 - Apache ZooKeeper - Jute
(https://github.com/apache/zookeeper/tree/release-3.6.3/zookeeper-jute),
(Apache License, Version 2.0)
io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9 -
StreamNative :: Pulsar Flink Connector :: Scala 2.11
(https://github.com/streamnative/pulsar-flink/blob/release-1.13/LICENSE),
(Apache License, Version 2.0)
- io.debezium:debezium-connector-sqlserver:1.6.4.Final - Apache debezium
(https://github.com/debezium/debezium/tree/v1.6.4.Final/debezium-connector-sqlserver),
(Apache License, Version 2.0)
- io.debezium:debezium-core:1.6.4.Final - Apache debezium
(https://github.com/debezium/debezium/tree/v1.6.4.Final/debezium-core), (Apache
License, Version 2.0)
- io.debezium:debezium-embedded:1.6.4.Final - Apache debezium
(https://github.com/debezium/debezium/tree/v1.6.4.Final/debezium-embedded),
(Apache License, Version 2.0)
- io.debezium:debezium-api:1.6.4.Final - Apache debezium
(https://github.com/debezium/debezium/tree/v1.6.4.Final/debezium-api), (Apache
License, Version 2.0)
+ io.debezium:debezium-connector-sqlserver:1.5.4.Final - Apache debezium
(https://github.com/debezium/debezium/tree/v1.5.4.Final/debezium-connector-sqlserver),
(Apache License, Version 2.0)
+ io.debezium:debezium-core:1.5.4.Final - Apache debezium
(https://github.com/debezium/debezium/tree/v1.5.4.Final/debezium-core), (Apache
License, Version 2.0)
+ io.debezium:debezium-embedded:1.5.4.Final - Apache debezium
(https://github.com/debezium/debezium/tree/v1.5.4.Final/debezium-embedded),
(Apache License, Version 2.0)
+ io.debezium:debezium-api:1.5.4.Final - Apache debezium
(https://github.com/debezium/debezium/tree/v1.5.4.Final/debezium-api), (Apache
License, Version 2.0)
========================================================================