This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a7c761f5f [INLONG-7410][Sort] Support open incremental snapshot in
oracle cdc connector (#7416)
a7c761f5f is described below
commit a7c761f5f50b8923ad3cdac60829a73864a08aeb
Author: emhui <[email protected]>
AuthorDate: Sun Feb 26 10:22:56 2023 +0800
[INLONG-7410][Sort] Support open incremental snapshot in oracle cdc
connector (#7416)
---
inlong-sort/sort-connectors/cdc-base/pom.xml | 6 +
.../sort/cdc/base/source/IncrementalSource.java | 217 ++++++++++++++++++
.../base/source/jdbc/JdbcIncrementalSource.java | 42 ++++
.../reader/IncrementalSourceRecordEmitter.java | 177 +++++++++++++++
.../cdc/oracle/source/OracleSourceBuilder.java | 244 +++++++++++++++++++++
.../sort/cdc/oracle/table/OracleTableSource.java | 135 ++++++++++--
.../cdc/oracle/table/OracleTableSourceFactory.java | 153 ++++++++-----
.../sort/parser/OracleExtractSqlParseTest.java | 6 +-
licenses/inlong-sort-connectors/LICENSE | 9 +-
pom.xml | 7 +
10 files changed, 918 insertions(+), 78 deletions(-)
diff --git a/inlong-sort/sort-connectors/cdc-base/pom.xml
b/inlong-sort/sort-connectors/cdc-base/pom.xml
index 44c3ea768..db7c4d42c 100644
--- a/inlong-sort/sort-connectors/cdc-base/pom.xml
+++ b/inlong-sort/sort-connectors/cdc-base/pom.xml
@@ -42,6 +42,12 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>flink-cdc-base</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
new file mode 100644
index 000000000..d9c87a2e9
--- /dev/null
+++
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.ververica.cdc.connectors.base.config.SourceConfig;
+import com.ververica.cdc.connectors.base.dialect.DataSourceDialect;
+import com.ververica.cdc.connectors.base.options.StartupMode;
+import com.ververica.cdc.connectors.base.source.assigner.HybridSplitAssigner;
+import com.ververica.cdc.connectors.base.source.assigner.SplitAssigner;
+import com.ververica.cdc.connectors.base.source.assigner.StreamSplitAssigner;
+import
com.ververica.cdc.connectors.base.source.assigner.state.HybridPendingSplitsState;
+import
com.ververica.cdc.connectors.base.source.assigner.state.PendingSplitsState;
+import
com.ververica.cdc.connectors.base.source.assigner.state.PendingSplitsStateSerializer;
+import
com.ververica.cdc.connectors.base.source.assigner.state.StreamPendingSplitsState;
+import
com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator;
+import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import
com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
+import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceReader;
+import
com.ververica.cdc.connectors.base.source.reader.IncrementalSourceSplitReader;
+import io.debezium.relational.TableId;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
+import
org.apache.inlong.sort.cdc.base.source.reader.IncrementalSourceRecordEmitter;
+
+/**
+ * The basic source of Incremental Snapshot framework for datasource, it is
based on FLIP-27 and
+ * Watermark Signal Algorithm which supports parallel reading snapshot of
table and then continue to
+ * capture data change by streaming reading.
+ * Copy from com.ververica:flink-cdc-base:2.3.0
+ */
+@Experimental
+public class IncrementalSource<T, C extends SourceConfig>
+ implements
+ Source<T, SourceSplitBase, PendingSplitsState>,
+ ResultTypeQueryable<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ protected final SourceConfig.Factory<C> configFactory;
+ protected final DataSourceDialect<C> dataSourceDialect;
+ protected final OffsetFactory offsetFactory;
+ protected final DebeziumDeserializationSchema<T> deserializationSchema;
+ protected final SourceSplitSerializer sourceSplitSerializer;
+
+ public IncrementalSource(
+ SourceConfig.Factory<C> configFactory,
+ DebeziumDeserializationSchema<T> deserializationSchema,
+ OffsetFactory offsetFactory,
+ DataSourceDialect<C> dataSourceDialect) {
+ this.configFactory = configFactory;
+ this.deserializationSchema = deserializationSchema;
+ this.offsetFactory = offsetFactory;
+ this.dataSourceDialect = dataSourceDialect;
+ this.sourceSplitSerializer =
+ new SourceSplitSerializer() {
+
+ @Override
+ public OffsetFactory getOffsetFactory() {
+ return offsetFactory;
+ }
+ };
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+
+ @Override
+ public IncrementalSourceReader<T, C> createReader(SourceReaderContext
readerContext)
+ throws Exception {
+ // create source config for the given subtask (e.g. unique server id)
+ C sourceConfig =
configFactory.create(readerContext.getIndexOfSubtask());
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>>
elementsQueue =
+ new FutureCompletingBlockingQueue<>();
+
+ // Forward compatible with flink 1.13
+ final Method metricGroupMethod =
readerContext.getClass().getMethod("metricGroup");
+ metricGroupMethod.setAccessible(true);
+ final MetricGroup metricGroup = (MetricGroup)
metricGroupMethod.invoke(readerContext);
+ final SourceReaderMetrics sourceReaderMetrics = new
SourceReaderMetrics(metricGroup);
+
+ sourceReaderMetrics.registerMetrics();
+ Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier =
+ () -> new IncrementalSourceSplitReader<>(
+ readerContext.getIndexOfSubtask(), dataSourceDialect,
sourceConfig);
+ return new IncrementalSourceReader<>(
+ elementsQueue,
+ splitReaderSupplier,
+ createRecordEmitter(sourceConfig, sourceReaderMetrics),
+ readerContext.getConfiguration(),
+ readerContext,
+ sourceConfig,
+ sourceSplitSerializer,
+ dataSourceDialect);
+ }
+
+ @Override
+ public SplitEnumerator<SourceSplitBase, PendingSplitsState>
createEnumerator(
+ SplitEnumeratorContext<SourceSplitBase> enumContext) {
+ C sourceConfig = configFactory.create(0);
+ final SplitAssigner splitAssigner;
+ if (sourceConfig.getStartupOptions().startupMode ==
StartupMode.INITIAL) {
+ try {
+ final List<TableId> remainingTables =
+
dataSourceDialect.discoverDataCollections(sourceConfig);
+ boolean isTableIdCaseSensitive =
+
dataSourceDialect.isDataCollectionIdCaseSensitive(sourceConfig);
+ splitAssigner =
+ new HybridSplitAssigner<>(
+ sourceConfig,
+ enumContext.currentParallelism(),
+ remainingTables,
+ isTableIdCaseSensitive,
+ dataSourceDialect,
+ offsetFactory);
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(
+ "Failed to discover captured tables for enumerator",
e);
+ }
+ } else {
+ splitAssigner = new StreamSplitAssigner(sourceConfig,
dataSourceDialect, offsetFactory);
+ }
+
+ return new IncrementalSourceEnumerator(enumContext, sourceConfig,
splitAssigner);
+ }
+
+ @Override
+ public SplitEnumerator<SourceSplitBase, PendingSplitsState>
restoreEnumerator(
+ SplitEnumeratorContext<SourceSplitBase> enumContext,
PendingSplitsState checkpoint) {
+ C sourceConfig = configFactory.create(0);
+
+ final SplitAssigner splitAssigner;
+ if (checkpoint instanceof HybridPendingSplitsState) {
+ splitAssigner =
+ new HybridSplitAssigner<>(
+ sourceConfig,
+ enumContext.currentParallelism(),
+ (HybridPendingSplitsState) checkpoint,
+ dataSourceDialect,
+ offsetFactory);
+ } else if (checkpoint instanceof StreamPendingSplitsState) {
+ splitAssigner =
+ new StreamSplitAssigner(
+ sourceConfig,
+ (StreamPendingSplitsState) checkpoint,
+ dataSourceDialect,
+ offsetFactory);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported restored PendingSplitsState: " + checkpoint);
+ }
+ return new IncrementalSourceEnumerator(enumContext, sourceConfig,
splitAssigner);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<SourceSplitBase> getSplitSerializer() {
+ return sourceSplitSerializer;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<PendingSplitsState>
getEnumeratorCheckpointSerializer() {
+ SourceSplitSerializer sourceSplitSerializer = (SourceSplitSerializer)
getSplitSerializer();
+ return new PendingSplitsStateSerializer(sourceSplitSerializer);
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+
+ protected RecordEmitter<SourceRecords, T, SourceSplitState>
createRecordEmitter(
+ SourceConfig sourceConfig, SourceReaderMetrics
sourceReaderMetrics) {
+ return new IncrementalSourceRecordEmitter<>(
+ deserializationSchema,
+ sourceReaderMetrics,
+ sourceConfig.isIncludeSchemaChanges(),
+ offsetFactory);
+ }
+}
diff --git
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/jdbc/JdbcIncrementalSource.java
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/jdbc/JdbcIncrementalSource.java
new file mode 100644
index 000000000..b059c2fb8
--- /dev/null
+++
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/jdbc/JdbcIncrementalSource.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.source.jdbc;
+
+import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
+import com.ververica.cdc.connectors.base.config.JdbcSourceConfigFactory;
+import com.ververica.cdc.connectors.base.dialect.JdbcDataSourceDialect;
+import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
+import org.apache.inlong.sort.cdc.base.source.IncrementalSource;
+
+/**
+ * The basic source of Incremental Snapshot framework for JDBC datasource, it
is based on FLIP-27
+ * and Watermark Signal Algorithm which supports parallel reading snapshot of
table and then
+ * continue to capture data change by streaming reading.
+ * Copy from com.ververica:flink-cdc-base:2.3.0
+ */
+public class JdbcIncrementalSource<T> extends IncrementalSource<T,
JdbcSourceConfig> {
+
+ public JdbcIncrementalSource(
+ JdbcSourceConfigFactory configFactory,
+ DebeziumDeserializationSchema<T> deserializationSchema,
+ OffsetFactory offsetFactory,
+ JdbcDataSourceDialect dataSourceDialect) {
+ super(configFactory, deserializationSchema, offsetFactory,
dataSourceDialect);
+ }
+}
diff --git
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
new file mode 100644
index 000000000..ec61afcb1
--- /dev/null
+++
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.source.reader;
+
+import static
com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isHighWatermarkEvent;
+import static
com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isWatermarkEvent;
+import static
com.ververica.cdc.connectors.base.utils.SourceRecordUtils.getFetchTimestamp;
+import static
com.ververica.cdc.connectors.base.utils.SourceRecordUtils.getHistoryRecord;
+import static
com.ververica.cdc.connectors.base.utils.SourceRecordUtils.getMessageTimestamp;
+import static
com.ververica.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
+import static
com.ververica.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
+
+import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
+import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
+import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceReader;
+import io.debezium.document.Array;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.TableChanges;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
+import
org.apache.inlong.sort.cdc.base.debezium.history.FlinkJsonTableChangeSerializer;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link RecordEmitter} implementation for {@link
IncrementalSourceReader}.
+ *
+ * <p>The {@link RecordEmitter} buffers the snapshot records of split and call
the stream reader to
+ * emit records rather than emit the records directly.
+ * Copy from com.ververica:flink-cdc-base:2.3.0
+ */
+public class IncrementalSourceRecordEmitter<T>
+ implements
+ RecordEmitter<SourceRecords, T, SourceSplitState> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IncrementalSourceRecordEmitter.class);
+ private static final FlinkJsonTableChangeSerializer
TABLE_CHANGE_SERIALIZER =
+ new FlinkJsonTableChangeSerializer();
+
+ protected final DebeziumDeserializationSchema<T>
debeziumDeserializationSchema;
+ protected final SourceReaderMetrics sourceReaderMetrics;
+ protected final boolean includeSchemaChanges;
+ protected final OutputCollector<T> outputCollector;
+ protected final OffsetFactory offsetFactory;
+
+ public IncrementalSourceRecordEmitter(
+ DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
+ SourceReaderMetrics sourceReaderMetrics,
+ boolean includeSchemaChanges,
+ OffsetFactory offsetFactory) {
+ this.debeziumDeserializationSchema = debeziumDeserializationSchema;
+ this.sourceReaderMetrics = sourceReaderMetrics;
+ this.includeSchemaChanges = includeSchemaChanges;
+ this.outputCollector = new OutputCollector<>();
+ this.offsetFactory = offsetFactory;
+ }
+
+ @Override
+ public void emitRecord(
+ SourceRecords sourceRecords, SourceOutput<T> output,
SourceSplitState splitState)
+ throws Exception {
+ final Iterator<SourceRecord> elementIterator =
sourceRecords.iterator();
+ while (elementIterator.hasNext()) {
+ processElement(elementIterator.next(), output, splitState);
+ }
+ }
+
+ protected void processElement(
+ SourceRecord element, SourceOutput<T> output, SourceSplitState
splitState)
+ throws Exception {
+ if (isWatermarkEvent(element)) {
+ Offset watermark = getWatermark(element);
+ if (isHighWatermarkEvent(element) &&
splitState.isSnapshotSplitState()) {
+ splitState.asSnapshotSplitState().setHighWatermark(watermark);
+ }
+ } else if (isSchemaChangeEvent(element) &&
splitState.isStreamSplitState()) {
+ HistoryRecord historyRecord = getHistoryRecord(element);
+ Array tableChanges =
+
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
+ TableChanges changes =
TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
+ for (TableChanges.TableChange tableChange : changes) {
+
splitState.asStreamSplitState().recordSchema(tableChange.getId(), tableChange);
+ }
+ if (includeSchemaChanges) {
+ emitElement(element, output);
+ }
+ } else if (isDataChangeRecord(element)) {
+ if (splitState.isStreamSplitState()) {
+ Offset position = getOffsetPosition(element);
+ splitState.asStreamSplitState().setStartingOffset(position);
+ }
+ reportMetrics(element);
+ emitElement(element, output);
+ } else {
+ // unknown element
+ LOG.info("Meet unknown element {}, just skip.", element);
+ }
+ }
+
+ private Offset getWatermark(SourceRecord watermarkEvent) {
+ return getOffsetPosition(watermarkEvent.sourceOffset());
+ }
+
+ public Offset getOffsetPosition(SourceRecord dataRecord) {
+ return getOffsetPosition(dataRecord.sourceOffset());
+ }
+
+ public Offset getOffsetPosition(Map<String, ?> offset) {
+ Map<String, String> offsetStrMap = new HashMap<>();
+ for (Map.Entry<String, ?> entry : offset.entrySet()) {
+ offsetStrMap.put(
+ entry.getKey(), entry.getValue() == null ? null :
entry.getValue().toString());
+ }
+ return offsetFactory.newOffset(offsetStrMap);
+ }
+
+ protected void emitElement(SourceRecord element, SourceOutput<T> output)
throws Exception {
+ outputCollector.output = output;
+ debeziumDeserializationSchema.deserialize(element, outputCollector);
+ }
+
+ protected void reportMetrics(SourceRecord element) {
+ long now = System.currentTimeMillis();
+ // record the latest process time
+ sourceReaderMetrics.recordProcessTime(now);
+ Long messageTimestamp = getMessageTimestamp(element);
+
+ if (messageTimestamp != null && messageTimestamp > 0L) {
+ // report fetch delay
+ Long fetchTimestamp = getFetchTimestamp(element);
+ if (fetchTimestamp != null) {
+ sourceReaderMetrics.recordFetchDelay(fetchTimestamp -
messageTimestamp);
+ }
+ // report emit delay
+ sourceReaderMetrics.recordEmitDelay(now - messageTimestamp);
+ }
+ }
+
+ private static class OutputCollector<T> implements Collector<T> {
+
+ private SourceOutput<T> output;
+
+ @Override
+ public void collect(T record) {
+ output.collect(record);
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+ }
+}
diff --git
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/OracleSourceBuilder.java
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/OracleSourceBuilder.java
new file mode 100644
index 000000000..87cced987
--- /dev/null
+++
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/OracleSourceBuilder.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.oracle.source;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import com.ververica.cdc.connectors.base.options.StartupOptions;
+import com.ververica.cdc.connectors.oracle.source.OracleDialect;
+import
com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfigFactory;
+import
com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffsetFactory;
+import java.time.Duration;
+import java.util.Properties;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
+import org.apache.inlong.sort.cdc.base.source.jdbc.JdbcIncrementalSource;
+
+/**
+ * The builder class for {@link OracleIncrementalSource} to make it easier for
the users to
+ * construct a {@link OracleIncrementalSource}.
+ *
+ * <p>Check the Java docs of each individual method to learn more about the
settings to build a
+ * {@link OracleIncrementalSource}.
+ * Copy from com.ververica:flink-connector-oracle-cdc:2.3.0
+ */
+@Internal
+public class OracleSourceBuilder<T> {
+
+ private final OracleSourceConfigFactory configFactory = new
OracleSourceConfigFactory();
+ private RedoLogOffsetFactory offsetFactory;
+ private OracleDialect dialect;
+ private DebeziumDeserializationSchema<T> deserializer;
+
+ public OracleSourceBuilder<T> hostname(String hostname) {
+ this.configFactory.hostname(hostname);
+ return this;
+ }
+
+ /** Url to use when connecting to the Oracle database server. */
+ public OracleSourceBuilder<T> url(@Nullable String url) {
+ this.configFactory.url(url);
+ return this;
+ }
+
+ /** Integer port number of the Oracle database server. */
+ public OracleSourceBuilder<T> port(int port) {
+ this.configFactory.port(port);
+ return this;
+ }
+
+ /**
+ * An required list of regular expressions that match database names to be
monitored; any
+ * database name not included in the whitelist will be excluded from
monitoring.
+ */
+ public OracleSourceBuilder<T> databaseList(String... databaseList) {
+ this.configFactory.databaseList(databaseList);
+ return this;
+ }
+
+ /**
+ * An optional list of regular expressions that match schema names to be
monitored; any schema
+ * name not included in the whitelist will be excluded from monitoring. By
default all
+ * non-system schemas will be monitored.
+ */
+ public OracleSourceBuilder<T> schemaList(String... schemaList) {
+ this.configFactory.schemaList(schemaList);
+ return this;
+ }
+
+ /**
+ * An required list of regular expressions that match fully-qualified
table identifiers for
+ * tables to be monitored; any table not included in the list will be
excluded from monitoring.
+ * Each identifier is of the form {@code <databaseName>.<tableName>}.
+ */
+ public OracleSourceBuilder<T> tableList(String... tableList) {
+ this.configFactory.tableList(tableList);
+ return this;
+ }
+
+ /** Name of the Oracle database to use when connecting to the Oracle
database server. */
+ public OracleSourceBuilder<T> username(String username) {
+ this.configFactory.username(username);
+ return this;
+ }
+
+ /** Password to use when connecting to the Oracle database server. */
+ public OracleSourceBuilder<T> password(String password) {
+ this.configFactory.password(password);
+ return this;
+ }
+
+ /**
+ * The session time zone in database server, e.g. "America/Los_Angeles".
It controls how the
+ * TIMESTAMP type in Oracle converted to STRING. See more
+ *
https://debezium.io/documentation/reference/1.5/connectors/Oracle.html#Oracle-temporal-types
+ */
+ public OracleSourceBuilder<T> serverTimeZone(String timeZone) {
+ this.configFactory.serverTimeZone(timeZone);
+ return this;
+ }
+
+ /**
+ * The split size (number of rows) of table snapshot, captured tables are
split into multiple
+ * splits when read the snapshot of table.
+ */
+ public OracleSourceBuilder<T> splitSize(int splitSize) {
+ this.configFactory.splitSize(splitSize);
+ return this;
+ }
+
+ /**
+ * The group size of split meta, if the meta size exceeds the group size,
the meta will be will
+ * be divided into multiple groups.
+ */
+ public OracleSourceBuilder<T> splitMetaGroupSize(int splitMetaGroupSize) {
+ this.configFactory.splitMetaGroupSize(splitMetaGroupSize);
+ return this;
+ }
+
+ /**
+ * The upper bound of split key evenly distribution factor, the factor is
used to determine
+ * whether the table is evenly distribution or not.
+ */
+ public OracleSourceBuilder<T> distributionFactorUpper(double
distributionFactorUpper) {
+ this.configFactory.distributionFactorUpper(distributionFactorUpper);
+ return this;
+ }
+
+ /**
+ * The lower bound of split key evenly distribution factor, the factor is
used to determine
+ * whether the table is evenly distribution or not.
+ */
+ public OracleSourceBuilder<T> distributionFactorLower(double
distributionFactorLower) {
+ this.configFactory.distributionFactorLower(distributionFactorLower);
+ return this;
+ }
+
+ /** The maximum fetch size for per poll when read table snapshot. */
+ public OracleSourceBuilder<T> fetchSize(int fetchSize) {
+ this.configFactory.fetchSize(fetchSize);
+ return this;
+ }
+
+ /**
+ * The maximum time that the connector should wait after trying to connect
to the Oracle
+ * database server before timing out.
+ */
+ public OracleSourceBuilder<T> connectTimeout(Duration connectTimeout) {
+ this.configFactory.connectTimeout(connectTimeout);
+ return this;
+ }
+
+ /** The max retry times to get connection. */
+ public OracleSourceBuilder<T> connectMaxRetries(int connectMaxRetries) {
+ this.configFactory.connectMaxRetries(connectMaxRetries);
+ return this;
+ }
+
+ /** The connection pool size. */
+ public OracleSourceBuilder<T> connectionPoolSize(int connectionPoolSize) {
+ this.configFactory.connectionPoolSize(connectionPoolSize);
+ return this;
+ }
+
+ /** Whether the {@link OracleIncrementalSource} should output the schema
changes or not. */
+ public OracleSourceBuilder<T> includeSchemaChanges(boolean
includeSchemaChanges) {
+ this.configFactory.includeSchemaChanges(includeSchemaChanges);
+ return this;
+ }
+
+ /** Specifies the startup options. */
+ public OracleSourceBuilder<T> startupOptions(StartupOptions
startupOptions) {
+ this.configFactory.startupOptions(startupOptions);
+ return this;
+ }
+
+ /**
+ * The chunk key of table snapshot, captured tables are split into
multiple chunks by the chunk
+ * key column when read the snapshot of table.
+ */
+ public OracleSourceBuilder<T> chunkKeyColumn(String chunkKeyColumn) {
+ this.configFactory.chunkKeyColumn(chunkKeyColumn);
+ return this;
+ }
+
+ /** The Debezium Oracle connector properties. For example,
"snapshot.mode". */
+ public OracleSourceBuilder<T> debeziumProperties(Properties properties) {
+ this.configFactory.debeziumProperties(properties);
+ return this;
+ }
+
+ /**
+ * The deserializer used to convert from consumed {@link
+ * org.apache.kafka.connect.source.SourceRecord}.
+ */
+ public OracleSourceBuilder<T>
deserializer(DebeziumDeserializationSchema<T> deserializer) {
+ this.deserializer = deserializer;
+ return this;
+ }
+
+ /**
+ * Build the {@link OracleIncrementalSource}.
+ *
+ * @return a OracleParallelSource with the settings made for this builder.
+ */
+ public OracleIncrementalSource<T> build() {
+ this.offsetFactory = new RedoLogOffsetFactory();
+ this.dialect = new OracleDialect(configFactory);
+ return new OracleIncrementalSource<T>(
+ configFactory, checkNotNull(deserializer), offsetFactory,
dialect);
+ }
+
+ /** The {@link JdbcIncrementalSource} implementation for Oracle. */
+ public static class OracleIncrementalSource<T> extends
JdbcIncrementalSource<T> {
+
+ public OracleIncrementalSource(
+ OracleSourceConfigFactory configFactory,
+ DebeziumDeserializationSchema<T> deserializationSchema,
+ RedoLogOffsetFactory offsetFactory,
+ OracleDialect dataSourceDialect) {
+ super(configFactory, deserializationSchema, offsetFactory,
dataSourceDialect);
+ }
+
+ public static <T> OracleSourceBuilder<T> builder() {
+ return new OracleSourceBuilder<>();
+ }
+
+ }
+}
diff --git
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
index 943f15abc..01f0b395e 100644
---
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
+++
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.cdc.oracle.table;
import static org.apache.flink.util.Preconditions.checkNotNull;
import com.ververica.cdc.connectors.base.options.StartupOptions;
+import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -27,12 +28,14 @@ import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.SourceProvider;
import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
@@ -40,9 +43,11 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
import org.apache.inlong.sort.cdc.base.debezium.table.MetadataConverter;
+import org.apache.inlong.sort.cdc.base.source.jdbc.JdbcIncrementalSource;
import org.apache.inlong.sort.cdc.oracle.debezium.DebeziumSourceFunction;
import
org.apache.inlong.sort.cdc.oracle.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.inlong.sort.cdc.oracle.OracleSource;
+import org.apache.inlong.sort.cdc.oracle.source.OracleSourceBuilder;
/**
* A {@link DynamicTableSource} that describes how to create a Oracle binlog
from a logical
@@ -63,6 +68,16 @@ public class OracleTableSource implements ScanTableSource,
SupportsReadingMetada
private final boolean sourceMultipleEnable;
private final String inlongMetric;
private final String inlongAudit;
+ private final boolean enableParallelRead;
+ private final int splitSize;
+ private final int splitMetaGroupSize;
+ private final int fetchSize;
+ private final Duration connectTimeout;
+ private final int connectionPoolSize;
+ private final int connectMaxRetries;
+ private final double distributionFactorUpper;
+ private final double distributionFactorLower;
+ private final String chunkKeyColumn;
//
--------------------------------------------------------------------------------------------
// Mutable attributes
@@ -87,7 +102,17 @@ public class OracleTableSource implements ScanTableSource,
SupportsReadingMetada
StartupOptions startupOptions,
boolean sourceMultipleEnable,
String inlongMetric,
- String inlongAudit) {
+ String inlongAudit,
+ boolean enableParallelRead,
+ int splitSize,
+ int splitMetaGroupSize,
+ int fetchSize,
+ Duration connectTimeout,
+ int connectMaxRetries,
+ int connectionPoolSize,
+ double distributionFactorUpper,
+ double distributionFactorLower,
+ @Nullable String chunkKeyColumn) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@@ -103,6 +128,16 @@ public class OracleTableSource implements ScanTableSource,
SupportsReadingMetada
this.sourceMultipleEnable = sourceMultipleEnable;
this.inlongMetric = inlongMetric;
this.inlongAudit = inlongAudit;
+ this.enableParallelRead = enableParallelRead;
+ this.splitSize = splitSize;
+ this.splitMetaGroupSize = splitMetaGroupSize;
+ this.fetchSize = fetchSize;
+ this.connectTimeout = connectTimeout;
+ this.connectMaxRetries = connectMaxRetries;
+ this.connectionPoolSize = connectionPoolSize;
+ this.distributionFactorUpper = distributionFactorUpper;
+ this.distributionFactorLower = distributionFactorLower;
+ this.chunkKeyColumn = chunkKeyColumn;
}
@Override
@@ -131,24 +166,50 @@ public class OracleTableSource implements
ScanTableSource, SupportsReadingMetada
OracleDeserializationConverterFactory.instance())
.setSourceMultipleEnable(sourceMultipleEnable)
.build();
- OracleSource.Builder<RowData> builder =
- OracleSource.<RowData>builder()
- .hostname(hostname)
- .port(port)
- .database(database)
- .tableList(tableName)
- .schemaList(schemaName)
- .username(username)
- .password(password)
- .debeziumProperties(dbzProperties)
- .startupOptions(startupOptions)
- .deserializer(deserializer)
- .inlongMetric(inlongMetric)
- .inlongAudit(inlongAudit)
- .sourceMultipleEnable(sourceMultipleEnable);
- DebeziumSourceFunction<RowData> sourceFunction = builder.build();
-
- return SourceFunctionProvider.of(sourceFunction, false);
+ if (enableParallelRead) {
+ JdbcIncrementalSource<RowData> oracleChangeEventSource =
+
OracleSourceBuilder.OracleIncrementalSource.<RowData>builder()
+ .hostname(hostname)
+ .port(port)
+ .databaseList(database)
+ .schemaList(schemaName)
+ .tableList(tableName)
+ .username(username)
+ .password(password)
+ .startupOptions(startupOptions)
+ .deserializer(deserializer)
+ .debeziumProperties(dbzProperties)
+ .splitSize(splitSize)
+ .splitMetaGroupSize(splitMetaGroupSize)
+ .fetchSize(fetchSize)
+ .connectTimeout(connectTimeout)
+ .connectionPoolSize(connectionPoolSize)
+ .connectMaxRetries(connectMaxRetries)
+ .distributionFactorUpper(distributionFactorUpper)
+ .distributionFactorLower(distributionFactorLower)
+ .build();
+
+ return SourceProvider.of(oracleChangeEventSource);
+ } else {
+ OracleSource.Builder<RowData> builder =
+ OracleSource.<RowData>builder()
+ .hostname(hostname)
+ .port(port)
+ .database(database)
+ .tableList(tableName)
+ .schemaList(schemaName)
+ .username(username)
+ .password(password)
+ .debeziumProperties(dbzProperties)
+ .startupOptions(startupOptions)
+ .deserializer(deserializer)
+ .inlongMetric(inlongMetric)
+ .inlongAudit(inlongAudit)
+ .sourceMultipleEnable(sourceMultipleEnable);
+ DebeziumSourceFunction<RowData> sourceFunction = builder.build();
+
+ return SourceFunctionProvider.of(sourceFunction, false);
+ }
}
private MetadataConverter[] getMetadataConverters() {
@@ -182,7 +243,17 @@ public class OracleTableSource implements ScanTableSource,
SupportsReadingMetada
startupOptions,
sourceMultipleEnable,
inlongMetric,
- inlongAudit);
+ inlongAudit,
+ enableParallelRead,
+ splitSize,
+ splitMetaGroupSize,
+ fetchSize,
+ connectTimeout,
+ connectMaxRetries,
+ connectionPoolSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ chunkKeyColumn);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@@ -210,7 +281,17 @@ public class OracleTableSource implements ScanTableSource,
SupportsReadingMetada
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys)
&& Objects.equals(inlongMetric, that.inlongMetric)
- && Objects.equals(inlongAudit, that.inlongAudit);
+ && Objects.equals(inlongAudit, that.inlongAudit)
+ && Objects.equals(enableParallelRead, that.enableParallelRead)
+ && Objects.equals(splitSize, that.splitSize)
+ && Objects.equals(splitMetaGroupSize, that.splitMetaGroupSize)
+ && Objects.equals(fetchSize, that.fetchSize)
+ && Objects.equals(connectTimeout, that.connectTimeout)
+ && Objects.equals(connectMaxRetries, that.connectMaxRetries)
+ && Objects.equals(connectionPoolSize, that.connectionPoolSize)
+ && Objects.equals(distributionFactorUpper,
that.distributionFactorUpper)
+ && Objects.equals(distributionFactorLower,
that.distributionFactorLower)
+ && Objects.equals(chunkKeyColumn, that.chunkKeyColumn);
}
@Override
@@ -229,7 +310,17 @@ public class OracleTableSource implements ScanTableSource,
SupportsReadingMetada
producedDataType,
metadataKeys,
inlongMetric,
- inlongAudit);
+ inlongAudit,
+ enableParallelRead,
+ splitSize,
+ splitMetaGroupSize,
+ fetchSize,
+ connectTimeout,
+ connectMaxRetries,
+ connectionPoolSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ chunkKeyColumn);
}
@Override
diff --git
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
index a6535dc8f..a91976b7c 100644
---
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
+++
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
@@ -18,8 +18,8 @@
package org.apache.inlong.sort.cdc.oracle.table;
import com.ververica.cdc.connectors.base.options.StartupOptions;
+import java.time.Duration;
import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedSchema;
@@ -31,7 +31,28 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.inlong.sort.cdc.base.debezium.table.DebeziumOptions;
+import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
+import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
+import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
+import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.DATABASE_NAME;
+import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.HOSTNAME;
+import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.PASSWORD;
+import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
+import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCHEMA_NAME;
+import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SERVER_TIME_ZONE;
+import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME;
+import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.USERNAME;
+import static
com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
+import static
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE;
+import static
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+import static
com.ververica.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
+import static
com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions.PORT;
import static
com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
+import static org.apache.flink.util.Preconditions.checkState;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static org.apache.inlong.sort.base.Constants.SOURCE_MULTIPLE_ENABLE;
@@ -43,58 +64,6 @@ public class OracleTableSourceFactory implements
DynamicTableSourceFactory {
private static final String IDENTIFIER = "oracle-cdc-inlong";
- private static final ConfigOption<String> HOSTNAME =
- ConfigOptions.key("hostname")
- .stringType()
- .noDefaultValue()
- .withDescription("IP address or hostname of the Oracle
database server.");
-
- private static final ConfigOption<Integer> PORT =
- ConfigOptions.key("port")
- .intType()
- .defaultValue(1521)
- .withDescription("Integer port number of the Oracle
database server.");
-
- private static final ConfigOption<String> USERNAME =
- ConfigOptions.key("username")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Name of the Oracle database to use when
connecting to the Oracle database server.");
-
- private static final ConfigOption<String> PASSWORD =
- ConfigOptions.key("password")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Password to use when connecting to the oracle
database server.");
-
- private static final ConfigOption<String> DATABASE_NAME =
- ConfigOptions.key("database-name")
- .stringType()
- .noDefaultValue()
- .withDescription("Database name of the Oracle server to
monitor.");
-
- private static final ConfigOption<String> SCHEMA_NAME =
- ConfigOptions.key("schema-name")
- .stringType()
- .noDefaultValue()
- .withDescription("Schema name of the Oracle database to
monitor.");
-
- private static final ConfigOption<String> TABLE_NAME =
- ConfigOptions.key("table-name")
- .stringType()
- .noDefaultValue()
- .withDescription("Table name of the Oracle database to
monitor.");
-
- public static final ConfigOption<String> SCAN_STARTUP_MODE =
- ConfigOptions.key("scan.startup.mode")
- .stringType()
- .defaultValue("initial")
- .withDescription(
- "Optional startup mode for Oracle CDC consumer,
valid enumerations are "
- + "\"initial\", \"latest-offset\"");
-
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
@@ -114,6 +83,28 @@ public class OracleTableSourceFactory implements
DynamicTableSourceFactory {
ResolvedSchema physicalSchema =
context.getCatalogTable().getResolvedSchema();
String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
String inlongAudit = config.get(INLONG_AUDIT);
+ boolean enableParallelRead =
config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+ int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
+ int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
+ int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
+ Duration connectTimeout = config.get(CONNECT_TIMEOUT);
+ int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
+ int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
+ double distributionFactorUpper =
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+ double distributionFactorLower =
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+ String chunkKeyColumn =
+
config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null);
+ String serverTimezone = config.get(SERVER_TIME_ZONE);
+
+ if (enableParallelRead) {
+ validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
splitSize, 1);
+ validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
+ validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize,
1);
+ validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1);
+ validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0);
+ validateDistributionFactorUpper(distributionFactorUpper);
+ validateDistributionFactorLower(distributionFactorLower);
+ }
return new OracleTableSource(
physicalSchema,
port,
@@ -127,7 +118,17 @@ public class OracleTableSourceFactory implements
DynamicTableSourceFactory {
startupOptions,
sourceMultipleEnable,
inlongMetric,
- inlongAudit);
+ inlongAudit,
+ enableParallelRead,
+ splitSize,
+ splitMetaGroupSize,
+ fetchSize,
+ connectTimeout,
+ connectMaxRetries,
+ connectionPoolSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ chunkKeyColumn);
}
@Override
@@ -155,6 +156,16 @@ public class OracleTableSourceFactory implements
DynamicTableSourceFactory {
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
options.add(SOURCE_MULTIPLE_ENABLE);
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
+ options.add(CHUNK_META_GROUP_SIZE);
+ options.add(SCAN_SNAPSHOT_FETCH_SIZE);
+ options.add(CONNECT_TIMEOUT);
+ options.add(CONNECT_MAX_RETRIES);
+ options.add(CONNECTION_POOL_SIZE);
+ options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+ options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
return options;
}
@@ -181,4 +192,38 @@ public class OracleTableSourceFactory implements
DynamicTableSourceFactory {
modeString));
}
}
+
+ /** Checks the value of given integer option is valid. */
+ private void validateIntegerOption(
+ ConfigOption<Integer> option, int optionValue, int exclusiveMin) {
+ checkState(
+ optionValue > exclusiveMin,
+ String.format(
+ "The value of option '%s' must larger than %d, but is
%d",
+ option.key(), exclusiveMin, optionValue));
+ }
+
+ /** Checks the value of given evenly distribution factor upper bound is
valid. */
+ private void validateDistributionFactorUpper(double
distributionFactorUpper) {
+ checkState(
+ doubleCompare(distributionFactorUpper, 1.0d) >= 0,
+ String.format(
+ "The value of option '%s' must larger than or equals
%s, but is %s",
+ SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(),
+ 1.0d,
+ distributionFactorUpper));
+ }
+
+ /** Checks the value of given evenly distribution factor lower bound is
valid. */
+ private void validateDistributionFactorLower(double
distributionFactorLower) {
+ checkState(
+ doubleCompare(distributionFactorLower, 0.0d) >= 0
+ && doubleCompare(distributionFactorLower, 1.0d) <= 0,
+ String.format(
+ "The value of option '%s' must between %s and %s
inclusively, but is %s",
+ SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(),
+ 0.0d,
+ 1.0d,
+ distributionFactorLower));
+ }
}
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
index 3b0489fe6..335afcdd5 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.parser;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -66,8 +68,10 @@ public class OracleExtractSqlParseTest extends
AbstractTestBase {
new MetaFieldInfo("table_name", MetaField.TABLE_NAME),
new MetaFieldInfo("op_ts", MetaField.OP_TS),
new MetaFieldInfo("schema_name", MetaField.SCHEMA_NAME));
+ Map<String, String> properties = new HashMap<>();
+ properties.put("scan.incremental.snapshot.enabled", "true");
return new OracleExtractNode("1", "oracle_input", fields,
- null, null, "ID", "localhost",
+ null, properties, "ID", "localhost",
"flinkuser", "flinkpw", "xE",
"flinkuser", "table", 1521, null);
}
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index eb955300f..93056ce8b 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -459,7 +459,7 @@
Source : flink-cdc-connectors 2.2.1 (Please note that the software have been
modified.)
License :
https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
-
+
1.3.2
inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/HiveValidator.java
inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java
inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/HiveTableMetaStoreFactory.java
@@ -598,6 +598,13 @@
Source : com.starrocks:flink-connector-starrocks:1.2.3_flink-1.13_2.11
(Please note that the software have been modified.)
License : https://www.apache.org/licenses/LICENSE-2.0.txt
+1.3.14
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/jdbc/JdbcIncrementalSource.java
+
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
+
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
+
inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/OracleSourceBuilder.java
+
+ Source : flink-cdc-connectors 2.3.0 (Please note that the software have been
modified.)
+ License :
https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
=======================================================================
Apache InLong Subcomponents:
diff --git a/pom.xml b/pom.xml
index dc9e84135..ab6a8f77f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -165,6 +165,7 @@
<flink.connector.oracle.cdc.version>2.3.0</flink.connector.oracle.cdc.version>
<flink.connector.doris.version>1.0.3</flink.connector.doris.version>
<flink.connector.redis>1.1.0</flink.connector.redis>
+ <flink.cdc.base.version>2.3.0</flink.cdc.base.version>
<curator.version>2.12.0</curator.version>
@@ -1047,6 +1048,12 @@
<version>${flink.connector.sqlserver.cdc.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>flink-cdc-base</artifactId>
+ <version>${flink.cdc.base.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${flink.scala.binary.version}</artifactId>