This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a7c761f5f [INLONG-7410][Sort] Support open incremental snapshot in 
oracle cdc connector (#7416)
a7c761f5f is described below

commit a7c761f5f50b8923ad3cdac60829a73864a08aeb
Author: emhui <[email protected]>
AuthorDate: Sun Feb 26 10:22:56 2023 +0800

    [INLONG-7410][Sort] Support open incremental snapshot in oracle cdc 
connector (#7416)
---
 inlong-sort/sort-connectors/cdc-base/pom.xml       |   6 +
 .../sort/cdc/base/source/IncrementalSource.java    | 217 ++++++++++++++++++
 .../base/source/jdbc/JdbcIncrementalSource.java    |  42 ++++
 .../reader/IncrementalSourceRecordEmitter.java     | 177 +++++++++++++++
 .../cdc/oracle/source/OracleSourceBuilder.java     | 244 +++++++++++++++++++++
 .../sort/cdc/oracle/table/OracleTableSource.java   | 135 ++++++++++--
 .../cdc/oracle/table/OracleTableSourceFactory.java | 153 ++++++++-----
 .../sort/parser/OracleExtractSqlParseTest.java     |   6 +-
 licenses/inlong-sort-connectors/LICENSE            |   9 +-
 pom.xml                                            |   7 +
 10 files changed, 918 insertions(+), 78 deletions(-)

diff --git a/inlong-sort/sort-connectors/cdc-base/pom.xml 
b/inlong-sort/sort-connectors/cdc-base/pom.xml
index 44c3ea768..db7c4d42c 100644
--- a/inlong-sort/sort-connectors/cdc-base/pom.xml
+++ b/inlong-sort/sort-connectors/cdc-base/pom.xml
@@ -42,6 +42,12 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-cdc-base</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
     </dependencies>
 
 </project>
diff --git 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
new file mode 100644
index 000000000..d9c87a2e9
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.ververica.cdc.connectors.base.config.SourceConfig;
+import com.ververica.cdc.connectors.base.dialect.DataSourceDialect;
+import com.ververica.cdc.connectors.base.options.StartupMode;
+import com.ververica.cdc.connectors.base.source.assigner.HybridSplitAssigner;
+import com.ververica.cdc.connectors.base.source.assigner.SplitAssigner;
+import com.ververica.cdc.connectors.base.source.assigner.StreamSplitAssigner;
+import 
com.ververica.cdc.connectors.base.source.assigner.state.HybridPendingSplitsState;
+import 
com.ververica.cdc.connectors.base.source.assigner.state.PendingSplitsState;
+import 
com.ververica.cdc.connectors.base.source.assigner.state.PendingSplitsStateSerializer;
+import 
com.ververica.cdc.connectors.base.source.assigner.state.StreamPendingSplitsState;
+import 
com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator;
+import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import 
com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
+import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceReader;
+import 
com.ververica.cdc.connectors.base.source.reader.IncrementalSourceSplitReader;
+import io.debezium.relational.TableId;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
+import 
org.apache.inlong.sort.cdc.base.source.reader.IncrementalSourceRecordEmitter;
+
+/**
+ * The basic source of Incremental Snapshot framework for datasource, it is 
based on FLIP-27 and
+ * Watermark Signal Algorithm which supports parallel reading snapshot of 
table and then continue to
+ * capture data change by streaming reading.
+ * Copy from com.ververica:flink-cdc-base:2.3.0
+ */
+@Experimental
+public class IncrementalSource<T, C extends SourceConfig>
+        implements
+            Source<T, SourceSplitBase, PendingSplitsState>,
+            ResultTypeQueryable<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    protected final SourceConfig.Factory<C> configFactory;
+    protected final DataSourceDialect<C> dataSourceDialect;
+    protected final OffsetFactory offsetFactory;
+    protected final DebeziumDeserializationSchema<T> deserializationSchema;
+    protected final SourceSplitSerializer sourceSplitSerializer;
+
+    public IncrementalSource(
+            SourceConfig.Factory<C> configFactory,
+            DebeziumDeserializationSchema<T> deserializationSchema,
+            OffsetFactory offsetFactory,
+            DataSourceDialect<C> dataSourceDialect) {
+        this.configFactory = configFactory;
+        this.deserializationSchema = deserializationSchema;
+        this.offsetFactory = offsetFactory;
+        this.dataSourceDialect = dataSourceDialect;
+        this.sourceSplitSerializer =
+                new SourceSplitSerializer() {
+
+                    @Override
+                    public OffsetFactory getOffsetFactory() {
+                        return offsetFactory;
+                    }
+                };
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public IncrementalSourceReader<T, C> createReader(SourceReaderContext 
readerContext)
+            throws Exception {
+        // create source config for the given subtask (e.g. unique server id)
+        C sourceConfig = 
configFactory.create(readerContext.getIndexOfSubtask());
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> 
elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+
+        // Forward compatible with flink 1.13
+        final Method metricGroupMethod = 
readerContext.getClass().getMethod("metricGroup");
+        metricGroupMethod.setAccessible(true);
+        final MetricGroup metricGroup = (MetricGroup) 
metricGroupMethod.invoke(readerContext);
+        final SourceReaderMetrics sourceReaderMetrics = new 
SourceReaderMetrics(metricGroup);
+
+        sourceReaderMetrics.registerMetrics();
+        Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier =
+                () -> new IncrementalSourceSplitReader<>(
+                        readerContext.getIndexOfSubtask(), dataSourceDialect, 
sourceConfig);
+        return new IncrementalSourceReader<>(
+                elementsQueue,
+                splitReaderSupplier,
+                createRecordEmitter(sourceConfig, sourceReaderMetrics),
+                readerContext.getConfiguration(),
+                readerContext,
+                sourceConfig,
+                sourceSplitSerializer,
+                dataSourceDialect);
+    }
+
+    @Override
+    public SplitEnumerator<SourceSplitBase, PendingSplitsState> 
createEnumerator(
+            SplitEnumeratorContext<SourceSplitBase> enumContext) {
+        C sourceConfig = configFactory.create(0);
+        final SplitAssigner splitAssigner;
+        if (sourceConfig.getStartupOptions().startupMode == 
StartupMode.INITIAL) {
+            try {
+                final List<TableId> remainingTables =
+                        
dataSourceDialect.discoverDataCollections(sourceConfig);
+                boolean isTableIdCaseSensitive =
+                        
dataSourceDialect.isDataCollectionIdCaseSensitive(sourceConfig);
+                splitAssigner =
+                        new HybridSplitAssigner<>(
+                                sourceConfig,
+                                enumContext.currentParallelism(),
+                                remainingTables,
+                                isTableIdCaseSensitive,
+                                dataSourceDialect,
+                                offsetFactory);
+            } catch (Exception e) {
+                throw new FlinkRuntimeException(
+                        "Failed to discover captured tables for enumerator", 
e);
+            }
+        } else {
+            splitAssigner = new StreamSplitAssigner(sourceConfig, 
dataSourceDialect, offsetFactory);
+        }
+
+        return new IncrementalSourceEnumerator(enumContext, sourceConfig, 
splitAssigner);
+    }
+
+    @Override
+    public SplitEnumerator<SourceSplitBase, PendingSplitsState> 
restoreEnumerator(
+            SplitEnumeratorContext<SourceSplitBase> enumContext, 
PendingSplitsState checkpoint) {
+        C sourceConfig = configFactory.create(0);
+
+        final SplitAssigner splitAssigner;
+        if (checkpoint instanceof HybridPendingSplitsState) {
+            splitAssigner =
+                    new HybridSplitAssigner<>(
+                            sourceConfig,
+                            enumContext.currentParallelism(),
+                            (HybridPendingSplitsState) checkpoint,
+                            dataSourceDialect,
+                            offsetFactory);
+        } else if (checkpoint instanceof StreamPendingSplitsState) {
+            splitAssigner =
+                    new StreamSplitAssigner(
+                            sourceConfig,
+                            (StreamPendingSplitsState) checkpoint,
+                            dataSourceDialect,
+                            offsetFactory);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported restored PendingSplitsState: " + checkpoint);
+        }
+        return new IncrementalSourceEnumerator(enumContext, sourceConfig, 
splitAssigner);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<SourceSplitBase> getSplitSerializer() {
+        return sourceSplitSerializer;
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PendingSplitsState> 
getEnumeratorCheckpointSerializer() {
+        SourceSplitSerializer sourceSplitSerializer = (SourceSplitSerializer) 
getSplitSerializer();
+        return new PendingSplitsStateSerializer(sourceSplitSerializer);
+    }
+
+    @Override
+    public TypeInformation<T> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+
+    protected RecordEmitter<SourceRecords, T, SourceSplitState> 
createRecordEmitter(
+            SourceConfig sourceConfig, SourceReaderMetrics 
sourceReaderMetrics) {
+        return new IncrementalSourceRecordEmitter<>(
+                deserializationSchema,
+                sourceReaderMetrics,
+                sourceConfig.isIncludeSchemaChanges(),
+                offsetFactory);
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/jdbc/JdbcIncrementalSource.java
 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/jdbc/JdbcIncrementalSource.java
new file mode 100644
index 000000000..b059c2fb8
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/jdbc/JdbcIncrementalSource.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.source.jdbc;
+
+import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
+import com.ververica.cdc.connectors.base.config.JdbcSourceConfigFactory;
+import com.ververica.cdc.connectors.base.dialect.JdbcDataSourceDialect;
+import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
+import org.apache.inlong.sort.cdc.base.source.IncrementalSource;
+
+/**
+ * The basic source of Incremental Snapshot framework for JDBC datasource, it 
is based on FLIP-27
+ * and Watermark Signal Algorithm which supports parallel reading snapshot of 
table and then
+ * continue to capture data change by streaming reading.
+ * Copy from com.ververica:flink-cdc-base:2.3.0
+ */
+public class JdbcIncrementalSource<T> extends IncrementalSource<T, 
JdbcSourceConfig> {
+
+    public JdbcIncrementalSource(
+            JdbcSourceConfigFactory configFactory,
+            DebeziumDeserializationSchema<T> deserializationSchema,
+            OffsetFactory offsetFactory,
+            JdbcDataSourceDialect dataSourceDialect) {
+        super(configFactory, deserializationSchema, offsetFactory, 
dataSourceDialect);
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
new file mode 100644
index 000000000..ec61afcb1
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.source.reader;
+
+import static 
com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isHighWatermarkEvent;
+import static 
com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isWatermarkEvent;
+import static 
com.ververica.cdc.connectors.base.utils.SourceRecordUtils.getFetchTimestamp;
+import static 
com.ververica.cdc.connectors.base.utils.SourceRecordUtils.getHistoryRecord;
+import static 
com.ververica.cdc.connectors.base.utils.SourceRecordUtils.getMessageTimestamp;
+import static 
com.ververica.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
+import static 
com.ververica.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
+
+import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
+import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
+import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceReader;
+import io.debezium.document.Array;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.TableChanges;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
+import 
org.apache.inlong.sort.cdc.base.debezium.history.FlinkJsonTableChangeSerializer;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link RecordEmitter} implementation for {@link 
IncrementalSourceReader}.
+ *
+ * <p>The {@link RecordEmitter} buffers the snapshot records of split and call 
the stream reader to
+ * emit records rather than emit the records directly.
+ * Copy from com.ververica:flink-cdc-base:2.3.0
+ */
+public class IncrementalSourceRecordEmitter<T>
+        implements
+            RecordEmitter<SourceRecords, T, SourceSplitState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalSourceRecordEmitter.class);
+    private static final FlinkJsonTableChangeSerializer 
TABLE_CHANGE_SERIALIZER =
+            new FlinkJsonTableChangeSerializer();
+
+    protected final DebeziumDeserializationSchema<T> 
debeziumDeserializationSchema;
+    protected final SourceReaderMetrics sourceReaderMetrics;
+    protected final boolean includeSchemaChanges;
+    protected final OutputCollector<T> outputCollector;
+    protected final OffsetFactory offsetFactory;
+
+    public IncrementalSourceRecordEmitter(
+            DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
+            SourceReaderMetrics sourceReaderMetrics,
+            boolean includeSchemaChanges,
+            OffsetFactory offsetFactory) {
+        this.debeziumDeserializationSchema = debeziumDeserializationSchema;
+        this.sourceReaderMetrics = sourceReaderMetrics;
+        this.includeSchemaChanges = includeSchemaChanges;
+        this.outputCollector = new OutputCollector<>();
+        this.offsetFactory = offsetFactory;
+    }
+
+    @Override
+    public void emitRecord(
+            SourceRecords sourceRecords, SourceOutput<T> output, 
SourceSplitState splitState)
+            throws Exception {
+        final Iterator<SourceRecord> elementIterator = 
sourceRecords.iterator();
+        while (elementIterator.hasNext()) {
+            processElement(elementIterator.next(), output, splitState);
+        }
+    }
+
+    protected void processElement(
+            SourceRecord element, SourceOutput<T> output, SourceSplitState 
splitState)
+            throws Exception {
+        if (isWatermarkEvent(element)) {
+            Offset watermark = getWatermark(element);
+            if (isHighWatermarkEvent(element) && 
splitState.isSnapshotSplitState()) {
+                splitState.asSnapshotSplitState().setHighWatermark(watermark);
+            }
+        } else if (isSchemaChangeEvent(element) && 
splitState.isStreamSplitState()) {
+            HistoryRecord historyRecord = getHistoryRecord(element);
+            Array tableChanges =
+                    
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
+            TableChanges changes = 
TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
+            for (TableChanges.TableChange tableChange : changes) {
+                
splitState.asStreamSplitState().recordSchema(tableChange.getId(), tableChange);
+            }
+            if (includeSchemaChanges) {
+                emitElement(element, output);
+            }
+        } else if (isDataChangeRecord(element)) {
+            if (splitState.isStreamSplitState()) {
+                Offset position = getOffsetPosition(element);
+                splitState.asStreamSplitState().setStartingOffset(position);
+            }
+            reportMetrics(element);
+            emitElement(element, output);
+        } else {
+            // unknown element
+            LOG.info("Meet unknown element {}, just skip.", element);
+        }
+    }
+
+    private Offset getWatermark(SourceRecord watermarkEvent) {
+        return getOffsetPosition(watermarkEvent.sourceOffset());
+    }
+
+    public Offset getOffsetPosition(SourceRecord dataRecord) {
+        return getOffsetPosition(dataRecord.sourceOffset());
+    }
+
+    public Offset getOffsetPosition(Map<String, ?> offset) {
+        Map<String, String> offsetStrMap = new HashMap<>();
+        for (Map.Entry<String, ?> entry : offset.entrySet()) {
+            offsetStrMap.put(
+                    entry.getKey(), entry.getValue() == null ? null : 
entry.getValue().toString());
+        }
+        return offsetFactory.newOffset(offsetStrMap);
+    }
+
+    protected void emitElement(SourceRecord element, SourceOutput<T> output) 
throws Exception {
+        outputCollector.output = output;
+        debeziumDeserializationSchema.deserialize(element, outputCollector);
+    }
+
+    protected void reportMetrics(SourceRecord element) {
+        long now = System.currentTimeMillis();
+        // record the latest process time
+        sourceReaderMetrics.recordProcessTime(now);
+        Long messageTimestamp = getMessageTimestamp(element);
+
+        if (messageTimestamp != null && messageTimestamp > 0L) {
+            // report fetch delay
+            Long fetchTimestamp = getFetchTimestamp(element);
+            if (fetchTimestamp != null) {
+                sourceReaderMetrics.recordFetchDelay(fetchTimestamp - 
messageTimestamp);
+            }
+            // report emit delay
+            sourceReaderMetrics.recordEmitDelay(now - messageTimestamp);
+        }
+    }
+
+    private static class OutputCollector<T> implements Collector<T> {
+
+        private SourceOutput<T> output;
+
+        @Override
+        public void collect(T record) {
+            output.collect(record);
+        }
+
+        @Override
+        public void close() {
+            // do nothing
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/OracleSourceBuilder.java
 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/OracleSourceBuilder.java
new file mode 100644
index 000000000..87cced987
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/OracleSourceBuilder.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.oracle.source;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import com.ververica.cdc.connectors.base.options.StartupOptions;
+import com.ververica.cdc.connectors.oracle.source.OracleDialect;
+import 
com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfigFactory;
+import 
com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffsetFactory;
+import java.time.Duration;
+import java.util.Properties;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
+import org.apache.inlong.sort.cdc.base.source.jdbc.JdbcIncrementalSource;
+
+/**
+ * The builder class for {@link OracleIncrementalSource} to make it easier for 
the users to
+ * construct a {@link OracleIncrementalSource}.
+ *
+ * <p>Check the Java docs of each individual method to learn more about the 
settings to build a
+ * {@link OracleIncrementalSource}.
+ * Copy from com.ververica:flink-connector-oracle-cdc:2.3.0
+ */
+@Internal
+public class OracleSourceBuilder<T> {
+
+    private final OracleSourceConfigFactory configFactory = new 
OracleSourceConfigFactory();
+    private RedoLogOffsetFactory offsetFactory;
+    private OracleDialect dialect;
+    private DebeziumDeserializationSchema<T> deserializer;
+
+    public OracleSourceBuilder<T> hostname(String hostname) {
+        this.configFactory.hostname(hostname);
+        return this;
+    }
+
+    /** Url to use when connecting to the Oracle database server. */
+    public OracleSourceBuilder<T> url(@Nullable String url) {
+        this.configFactory.url(url);
+        return this;
+    }
+
+    /** Integer port number of the Oracle database server. */
+    public OracleSourceBuilder<T> port(int port) {
+        this.configFactory.port(port);
+        return this;
+    }
+
+    /**
+     * An required list of regular expressions that match database names to be 
monitored; any
+     * database name not included in the whitelist will be excluded from 
monitoring.
+     */
+    public OracleSourceBuilder<T> databaseList(String... databaseList) {
+        this.configFactory.databaseList(databaseList);
+        return this;
+    }
+
+    /**
+     * An optional list of regular expressions that match schema names to be 
monitored; any schema
+     * name not included in the whitelist will be excluded from monitoring. By 
default all
+     * non-system schemas will be monitored.
+     */
+    public OracleSourceBuilder<T> schemaList(String... schemaList) {
+        this.configFactory.schemaList(schemaList);
+        return this;
+    }
+
+    /**
+     * An required list of regular expressions that match fully-qualified 
table identifiers for
+     * tables to be monitored; any table not included in the list will be 
excluded from monitoring.
+     * Each identifier is of the form {@code <databaseName>.<tableName>}.
+     */
+    public OracleSourceBuilder<T> tableList(String... tableList) {
+        this.configFactory.tableList(tableList);
+        return this;
+    }
+
+    /** Name of the Oracle database to use when connecting to the Oracle 
database server. */
+    public OracleSourceBuilder<T> username(String username) {
+        this.configFactory.username(username);
+        return this;
+    }
+
+    /** Password to use when connecting to the Oracle database server. */
+    public OracleSourceBuilder<T> password(String password) {
+        this.configFactory.password(password);
+        return this;
+    }
+
+    /**
+     * The session time zone in database server, e.g. "America/Los_Angeles". 
It controls how the
+     * TIMESTAMP type in Oracle converted to STRING. See more
+     * 
https://debezium.io/documentation/reference/1.5/connectors/Oracle.html#Oracle-temporal-types
+     */
+    public OracleSourceBuilder<T> serverTimeZone(String timeZone) {
+        this.configFactory.serverTimeZone(timeZone);
+        return this;
+    }
+
+    /**
+     * The split size (number of rows) of table snapshot, captured tables are 
split into multiple
+     * splits when read the snapshot of table.
+     */
+    public OracleSourceBuilder<T> splitSize(int splitSize) {
+        this.configFactory.splitSize(splitSize);
+        return this;
+    }
+
+    /**
+     * The group size of split meta, if the meta size exceeds the group size, 
the meta will be will
+     * be divided into multiple groups.
+     */
+    public OracleSourceBuilder<T> splitMetaGroupSize(int splitMetaGroupSize) {
+        this.configFactory.splitMetaGroupSize(splitMetaGroupSize);
+        return this;
+    }
+
+    /**
+     * The upper bound of split key evenly distribution factor, the factor is 
used to determine
+     * whether the table is evenly distribution or not.
+     */
+    public OracleSourceBuilder<T> distributionFactorUpper(double 
distributionFactorUpper) {
+        this.configFactory.distributionFactorUpper(distributionFactorUpper);
+        return this;
+    }
+
+    /**
+     * The lower bound of split key evenly distribution factor, the factor is 
used to determine
+     * whether the table is evenly distribution or not.
+     */
+    public OracleSourceBuilder<T> distributionFactorLower(double 
distributionFactorLower) {
+        this.configFactory.distributionFactorLower(distributionFactorLower);
+        return this;
+    }
+
+    /** The maximum fetch size for per poll when read table snapshot. */
+    public OracleSourceBuilder<T> fetchSize(int fetchSize) {
+        this.configFactory.fetchSize(fetchSize);
+        return this;
+    }
+
+    /**
+     * The maximum time that the connector should wait after trying to connect 
to the Oracle
+     * database server before timing out.
+     */
+    public OracleSourceBuilder<T> connectTimeout(Duration connectTimeout) {
+        this.configFactory.connectTimeout(connectTimeout);
+        return this;
+    }
+
+    /** The max retry times to get connection. */
+    public OracleSourceBuilder<T> connectMaxRetries(int connectMaxRetries) {
+        this.configFactory.connectMaxRetries(connectMaxRetries);
+        return this;
+    }
+
+    /** The connection pool size. */
+    public OracleSourceBuilder<T> connectionPoolSize(int connectionPoolSize) {
+        this.configFactory.connectionPoolSize(connectionPoolSize);
+        return this;
+    }
+
+    /** Whether the {@link OracleIncrementalSource} should output the schema 
changes or not. */
+    public OracleSourceBuilder<T> includeSchemaChanges(boolean 
includeSchemaChanges) {
+        this.configFactory.includeSchemaChanges(includeSchemaChanges);
+        return this;
+    }
+
+    /** Specifies the startup options. */
+    public OracleSourceBuilder<T> startupOptions(StartupOptions 
startupOptions) {
+        this.configFactory.startupOptions(startupOptions);
+        return this;
+    }
+
+    /**
+     * The chunk key of table snapshot, captured tables are split into 
multiple chunks by the chunk
+     * key column when read the snapshot of table.
+     */
+    public OracleSourceBuilder<T> chunkKeyColumn(String chunkKeyColumn) {
+        this.configFactory.chunkKeyColumn(chunkKeyColumn);
+        return this;
+    }
+
+    /** The Debezium Oracle connector properties. For example, 
"snapshot.mode". */
+    public OracleSourceBuilder<T> debeziumProperties(Properties properties) {
+        this.configFactory.debeziumProperties(properties);
+        return this;
+    }
+
+    /**
+     * The deserializer used to convert from consumed {@link
+     * org.apache.kafka.connect.source.SourceRecord}.
+     */
+    public OracleSourceBuilder<T> 
deserializer(DebeziumDeserializationSchema<T> deserializer) {
+        this.deserializer = deserializer;
+        return this;
+    }
+
+    /**
+     * Build the {@link OracleIncrementalSource}.
+     *
+     * @return a OracleParallelSource with the settings made for this builder.
+     */
+    public OracleIncrementalSource<T> build() {
+        this.offsetFactory = new RedoLogOffsetFactory();
+        this.dialect = new OracleDialect(configFactory);
+        return new OracleIncrementalSource<T>(
+                configFactory, checkNotNull(deserializer), offsetFactory, 
dialect);
+    }
+
+    /** The {@link JdbcIncrementalSource} implementation for Oracle. */
+    public static class OracleIncrementalSource<T> extends 
JdbcIncrementalSource<T> {
+
+        public OracleIncrementalSource(
+                OracleSourceConfigFactory configFactory,
+                DebeziumDeserializationSchema<T> deserializationSchema,
+                RedoLogOffsetFactory offsetFactory,
+                OracleDialect dataSourceDialect) {
+            super(configFactory, deserializationSchema, offsetFactory, 
dataSourceDialect);
+        }
+
+        public static <T> OracleSourceBuilder<T> builder() {
+            return new OracleSourceBuilder<>();
+        }
+
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
index 943f15abc..01f0b395e 100644
--- 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
+++ 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.cdc.oracle.table;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 import com.ververica.cdc.connectors.base.options.StartupOptions;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -27,12 +28,14 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import javax.annotation.Nullable;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.SourceProvider;
 import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
@@ -40,9 +43,11 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
 import org.apache.inlong.sort.cdc.base.debezium.table.MetadataConverter;
+import org.apache.inlong.sort.cdc.base.source.jdbc.JdbcIncrementalSource;
 import org.apache.inlong.sort.cdc.oracle.debezium.DebeziumSourceFunction;
 import 
org.apache.inlong.sort.cdc.oracle.debezium.table.RowDataDebeziumDeserializeSchema;
 import org.apache.inlong.sort.cdc.oracle.OracleSource;
+import org.apache.inlong.sort.cdc.oracle.source.OracleSourceBuilder;
 
 /**
  * A {@link DynamicTableSource} that describes how to create a Oracle binlog 
from a logical
@@ -63,6 +68,16 @@ public class OracleTableSource implements ScanTableSource, 
SupportsReadingMetada
     private final boolean sourceMultipleEnable;
     private final String inlongMetric;
     private final String inlongAudit;
+    private final boolean enableParallelRead;
+    private final int splitSize;
+    private final int splitMetaGroupSize;
+    private final int fetchSize;
+    private final Duration connectTimeout;
+    private final int connectionPoolSize;
+    private final int connectMaxRetries;
+    private final double distributionFactorUpper;
+    private final double distributionFactorLower;
+    private final String chunkKeyColumn;
 
     // 
--------------------------------------------------------------------------------------------
     // Mutable attributes
@@ -87,7 +102,17 @@ public class OracleTableSource implements ScanTableSource, 
SupportsReadingMetada
             StartupOptions startupOptions,
             boolean sourceMultipleEnable,
             String inlongMetric,
-            String inlongAudit) {
+            String inlongAudit,
+            boolean enableParallelRead,
+            int splitSize,
+            int splitMetaGroupSize,
+            int fetchSize,
+            Duration connectTimeout,
+            int connectMaxRetries,
+            int connectionPoolSize,
+            double distributionFactorUpper,
+            double distributionFactorLower,
+            @Nullable String chunkKeyColumn) {
         this.physicalSchema = physicalSchema;
         this.port = port;
         this.hostname = checkNotNull(hostname);
@@ -103,6 +128,16 @@ public class OracleTableSource implements ScanTableSource, 
SupportsReadingMetada
         this.sourceMultipleEnable = sourceMultipleEnable;
         this.inlongMetric = inlongMetric;
         this.inlongAudit = inlongAudit;
+        this.enableParallelRead = enableParallelRead;
+        this.splitSize = splitSize;
+        this.splitMetaGroupSize = splitMetaGroupSize;
+        this.fetchSize = fetchSize;
+        this.connectTimeout = connectTimeout;
+        this.connectMaxRetries = connectMaxRetries;
+        this.connectionPoolSize = connectionPoolSize;
+        this.distributionFactorUpper = distributionFactorUpper;
+        this.distributionFactorLower = distributionFactorLower;
+        this.chunkKeyColumn = chunkKeyColumn;
     }
 
     @Override
@@ -131,24 +166,50 @@ public class OracleTableSource implements 
ScanTableSource, SupportsReadingMetada
                                 
OracleDeserializationConverterFactory.instance())
                         .setSourceMultipleEnable(sourceMultipleEnable)
                         .build();
-        OracleSource.Builder<RowData> builder =
-                OracleSource.<RowData>builder()
-                        .hostname(hostname)
-                        .port(port)
-                        .database(database)
-                        .tableList(tableName)
-                        .schemaList(schemaName)
-                        .username(username)
-                        .password(password)
-                        .debeziumProperties(dbzProperties)
-                        .startupOptions(startupOptions)
-                        .deserializer(deserializer)
-                        .inlongMetric(inlongMetric)
-                        .inlongAudit(inlongAudit)
-                        .sourceMultipleEnable(sourceMultipleEnable);
-        DebeziumSourceFunction<RowData> sourceFunction = builder.build();
-
-        return SourceFunctionProvider.of(sourceFunction, false);
+        if (enableParallelRead) {
+            JdbcIncrementalSource<RowData> oracleChangeEventSource =
+                    
OracleSourceBuilder.OracleIncrementalSource.<RowData>builder()
+                            .hostname(hostname)
+                            .port(port)
+                            .databaseList(database)
+                            .schemaList(schemaName)
+                            .tableList(tableName)
+                            .username(username)
+                            .password(password)
+                            .startupOptions(startupOptions)
+                            .deserializer(deserializer)
+                            .debeziumProperties(dbzProperties)
+                            .splitSize(splitSize)
+                            .splitMetaGroupSize(splitMetaGroupSize)
+                            .fetchSize(fetchSize)
+                            .connectTimeout(connectTimeout)
+                            .connectionPoolSize(connectionPoolSize)
+                            .connectMaxRetries(connectMaxRetries)
+                            .distributionFactorUpper(distributionFactorUpper)
+                            .distributionFactorLower(distributionFactorLower)
+                            .build();
+
+            return SourceProvider.of(oracleChangeEventSource);
+        } else {
+            OracleSource.Builder<RowData> builder =
+                    OracleSource.<RowData>builder()
+                            .hostname(hostname)
+                            .port(port)
+                            .database(database)
+                            .tableList(tableName)
+                            .schemaList(schemaName)
+                            .username(username)
+                            .password(password)
+                            .debeziumProperties(dbzProperties)
+                            .startupOptions(startupOptions)
+                            .deserializer(deserializer)
+                            .inlongMetric(inlongMetric)
+                            .inlongAudit(inlongAudit)
+                            .sourceMultipleEnable(sourceMultipleEnable);
+            DebeziumSourceFunction<RowData> sourceFunction = builder.build();
+
+            return SourceFunctionProvider.of(sourceFunction, false);
+        }
     }
 
     private MetadataConverter[] getMetadataConverters() {
@@ -182,7 +243,17 @@ public class OracleTableSource implements ScanTableSource, 
SupportsReadingMetada
                         startupOptions,
                         sourceMultipleEnable,
                         inlongMetric,
-                        inlongAudit);
+                        inlongAudit,
+                        enableParallelRead,
+                        splitSize,
+                        splitMetaGroupSize,
+                        fetchSize,
+                        connectTimeout,
+                        connectMaxRetries,
+                        connectionPoolSize,
+                        distributionFactorUpper,
+                        distributionFactorLower,
+                        chunkKeyColumn);
         source.metadataKeys = metadataKeys;
         source.producedDataType = producedDataType;
         return source;
@@ -210,7 +281,17 @@ public class OracleTableSource implements ScanTableSource, 
SupportsReadingMetada
                 && Objects.equals(producedDataType, that.producedDataType)
                 && Objects.equals(metadataKeys, that.metadataKeys)
                 && Objects.equals(inlongMetric, that.inlongMetric)
-                && Objects.equals(inlongAudit, that.inlongAudit);
+                && Objects.equals(inlongAudit, that.inlongAudit)
+                && Objects.equals(enableParallelRead, that.enableParallelRead)
+                && Objects.equals(splitSize, that.splitSize)
+                && Objects.equals(splitMetaGroupSize, that.splitMetaGroupSize)
+                && Objects.equals(fetchSize, that.fetchSize)
+                && Objects.equals(connectTimeout, that.connectTimeout)
+                && Objects.equals(connectMaxRetries, that.connectMaxRetries)
+                && Objects.equals(connectionPoolSize, that.connectionPoolSize)
+                && Objects.equals(distributionFactorUpper, 
that.distributionFactorUpper)
+                && Objects.equals(distributionFactorLower, 
that.distributionFactorLower)
+                && Objects.equals(chunkKeyColumn, that.chunkKeyColumn);
     }
 
     @Override
@@ -229,7 +310,17 @@ public class OracleTableSource implements ScanTableSource, 
SupportsReadingMetada
                 producedDataType,
                 metadataKeys,
                 inlongMetric,
-                inlongAudit);
+                inlongAudit,
+                enableParallelRead,
+                splitSize,
+                splitMetaGroupSize,
+                fetchSize,
+                connectTimeout,
+                connectMaxRetries,
+                connectionPoolSize,
+                distributionFactorUpper,
+                distributionFactorLower,
+                chunkKeyColumn);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
index a6535dc8f..a91976b7c 100644
--- 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
+++ 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
@@ -18,8 +18,8 @@
 package org.apache.inlong.sort.cdc.oracle.table;
 
 import com.ververica.cdc.connectors.base.options.StartupOptions;
+import java.time.Duration;
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -31,7 +31,28 @@ import java.util.HashSet;
 import java.util.Set;
 import org.apache.inlong.sort.cdc.base.debezium.table.DebeziumOptions;
 
+import static 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
+import static 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
+import static 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
+import static 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.DATABASE_NAME;
+import static 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.HOSTNAME;
+import static 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.PASSWORD;
+import static 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
+import static 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCHEMA_NAME;
+import static 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SERVER_TIME_ZONE;
+import static 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME;
+import static 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.USERNAME;
+import static 
com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static 
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static 
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
+import static 
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static 
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE;
+import static 
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static 
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+import static 
com.ververica.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
+import static 
com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions.PORT;
 import static 
com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static org.apache.inlong.sort.base.Constants.SOURCE_MULTIPLE_ENABLE;
@@ -43,58 +64,6 @@ public class OracleTableSourceFactory implements 
DynamicTableSourceFactory {
 
     private static final String IDENTIFIER = "oracle-cdc-inlong";
 
-    private static final ConfigOption<String> HOSTNAME =
-            ConfigOptions.key("hostname")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("IP address or hostname of the Oracle 
database server.");
-
-    private static final ConfigOption<Integer> PORT =
-            ConfigOptions.key("port")
-                    .intType()
-                    .defaultValue(1521)
-                    .withDescription("Integer port number of the Oracle 
database server.");
-
-    private static final ConfigOption<String> USERNAME =
-            ConfigOptions.key("username")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Name of the Oracle database to use when 
connecting to the Oracle database server.");
-
-    private static final ConfigOption<String> PASSWORD =
-            ConfigOptions.key("password")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Password to use when connecting to the oracle 
database server.");
-
-    private static final ConfigOption<String> DATABASE_NAME =
-            ConfigOptions.key("database-name")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Database name of the Oracle server to 
monitor.");
-
-    private static final ConfigOption<String> SCHEMA_NAME =
-            ConfigOptions.key("schema-name")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Schema name of the Oracle database to 
monitor.");
-
-    private static final ConfigOption<String> TABLE_NAME =
-            ConfigOptions.key("table-name")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Table name of the Oracle database to 
monitor.");
-
-    public static final ConfigOption<String> SCAN_STARTUP_MODE =
-            ConfigOptions.key("scan.startup.mode")
-                    .stringType()
-                    .defaultValue("initial")
-                    .withDescription(
-                            "Optional startup mode for Oracle CDC consumer, 
valid enumerations are "
-                                    + "\"initial\", \"latest-offset\"");
-
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
         final FactoryUtil.TableFactoryHelper helper =
@@ -114,6 +83,28 @@ public class OracleTableSourceFactory implements 
DynamicTableSourceFactory {
         ResolvedSchema physicalSchema = 
context.getCatalogTable().getResolvedSchema();
         String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         String inlongAudit = config.get(INLONG_AUDIT);
+        boolean enableParallelRead = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+        int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
+        int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
+        int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
+        Duration connectTimeout = config.get(CONNECT_TIMEOUT);
+        int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
+        int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
+        double distributionFactorUpper = 
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+        double distributionFactorLower = 
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+        String chunkKeyColumn =
+                
config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null);
+        String serverTimezone = config.get(SERVER_TIME_ZONE);
+
+        if (enableParallelRead) {
+            validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 
splitSize, 1);
+            validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
+            validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 
1);
+            validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1);
+            validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0);
+            validateDistributionFactorUpper(distributionFactorUpper);
+            validateDistributionFactorLower(distributionFactorLower);
+        }
         return new OracleTableSource(
                 physicalSchema,
                 port,
@@ -127,7 +118,17 @@ public class OracleTableSourceFactory implements 
DynamicTableSourceFactory {
                 startupOptions,
                 sourceMultipleEnable,
                 inlongMetric,
-                inlongAudit);
+                inlongAudit,
+                enableParallelRead,
+                splitSize,
+                splitMetaGroupSize,
+                fetchSize,
+                connectTimeout,
+                connectMaxRetries,
+                connectionPoolSize,
+                distributionFactorUpper,
+                distributionFactorLower,
+                chunkKeyColumn);
     }
 
     @Override
@@ -155,6 +156,16 @@ public class OracleTableSourceFactory implements 
DynamicTableSourceFactory {
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
         options.add(SOURCE_MULTIPLE_ENABLE);
+        options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+        options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
+        options.add(CHUNK_META_GROUP_SIZE);
+        options.add(SCAN_SNAPSHOT_FETCH_SIZE);
+        options.add(CONNECT_TIMEOUT);
+        options.add(CONNECT_MAX_RETRIES);
+        options.add(CONNECTION_POOL_SIZE);
+        options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+        options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+        options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
         return options;
     }
 
@@ -181,4 +192,38 @@ public class OracleTableSourceFactory implements 
DynamicTableSourceFactory {
                                 modeString));
         }
     }
+
+    /** Checks the value of given integer option is valid. */
+    private void validateIntegerOption(
+            ConfigOption<Integer> option, int optionValue, int exclusiveMin) {
+        checkState(
+                optionValue > exclusiveMin,
+                String.format(
+                        "The value of option '%s' must larger than %d, but is 
%d",
+                        option.key(), exclusiveMin, optionValue));
+    }
+
+    /** Checks the value of given evenly distribution factor upper bound is 
valid. */
+    private void validateDistributionFactorUpper(double 
distributionFactorUpper) {
+        checkState(
+                doubleCompare(distributionFactorUpper, 1.0d) >= 0,
+                String.format(
+                        "The value of option '%s' must larger than or equals 
%s, but is %s",
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(),
+                        1.0d,
+                        distributionFactorUpper));
+    }
+
+    /** Checks the value of given evenly distribution factor lower bound is 
valid. */
+    private void validateDistributionFactorLower(double 
distributionFactorLower) {
+        checkState(
+                doubleCompare(distributionFactorLower, 0.0d) >= 0
+                        && doubleCompare(distributionFactorLower, 1.0d) <= 0,
+                String.format(
+                        "The value of option '%s' must between %s and %s 
inclusively, but is %s",
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(),
+                        0.0d,
+                        1.0d,
+                        distributionFactorLower));
+    }
 }
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
index 3b0489fe6..335afcdd5 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.parser;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -66,8 +68,10 @@ public class OracleExtractSqlParseTest extends 
AbstractTestBase {
                 new MetaFieldInfo("table_name", MetaField.TABLE_NAME),
                 new MetaFieldInfo("op_ts", MetaField.OP_TS),
                 new MetaFieldInfo("schema_name", MetaField.SCHEMA_NAME));
+        Map<String, String> properties = new HashMap<>();
+        properties.put("scan.incremental.snapshot.enabled", "true");
         return new OracleExtractNode("1", "oracle_input", fields,
-                null, null, "ID", "localhost",
+                null, properties, "ID", "localhost",
                 "flinkuser", "flinkpw", "xE",
                 "flinkuser", "table", 1521, null);
     }
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index eb955300f..93056ce8b 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -459,7 +459,7 @@
 
  Source  : flink-cdc-connectors 2.2.1 (Please note that the software have been 
modified.)
  License : 
https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
- 
+
 1.3.2 
inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/HiveValidator.java
       
inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java
       
inlong-sort/sort-connectors/sort-hive/src/main/java/org/apache/inlong/sort/hive/HiveTableMetaStoreFactory.java
@@ -598,6 +598,13 @@
   Source  : com.starrocks:flink-connector-starrocks:1.2.3_flink-1.13_2.11 
(Please note that the software have been modified.)
   License : https://www.apache.org/licenses/LICENSE-2.0.txt
 
+1.3.14 
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/jdbc/JdbcIncrementalSource.java
+       
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
+       
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
+       
inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/OracleSourceBuilder.java
+
+ Source  : flink-cdc-connectors 2.3.0 (Please note that the software have been 
modified.)
+ License : 
https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
 
 =======================================================================
 Apache InLong Subcomponents:
diff --git a/pom.xml b/pom.xml
index dc9e84135..ab6a8f77f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -165,6 +165,7 @@
         
<flink.connector.oracle.cdc.version>2.3.0</flink.connector.oracle.cdc.version>
         <flink.connector.doris.version>1.0.3</flink.connector.doris.version>
         <flink.connector.redis>1.1.0</flink.connector.redis>
+        <flink.cdc.base.version>2.3.0</flink.cdc.base.version>
 
         <curator.version>2.12.0</curator.version>
 
@@ -1047,6 +1048,12 @@
                 <version>${flink.connector.sqlserver.cdc.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>com.ververica</groupId>
+                <artifactId>flink-cdc-base</artifactId>
+                <version>${flink.cdc.base.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 
<artifactId>flink-connector-jdbc_${flink.scala.binary.version}</artifactId>

Reply via email to