Mrart commented on code in PR #3995:
URL: https://github.com/apache/flink-cdc/pull/3995#discussion_r2596778551


##########
docs/content/docs/connectors/pipeline-connectors/oracle.md:
##########
@@ -0,0 +1,462 @@
+---
+title: "ORACLE"
+weight: 2
+type: docs
+aliases:
+- /connectors/pipeline-connectors/oracle
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Oracle Connector
+
+Oracle connector allows reading snapshot data and incremental data from Oracle 
database and provides end-to-end full-database data synchronization 
capabilities.
+This document describes how to setup the Oracle connector.
+
+
+## Example
+
+An example of the pipeline for reading data from Oracle and sink to Doris can 
be defined as follows:
+
+```yaml
+source:
+   type: oracle
+   name: Oracle Source
+   hostname: 127.0.0.1
+   port: 1521
+   username: debezium
+   password: password
+   database: ORCLDB
+   tables: testdb.\.*, testdb.user_table_[0-9]+, [app|web].order_\.*
+
+sink:
+  type: doris
+  name: Doris Sink
+  fenodes: 127.0.0.1:8030
+  username: root
+  password: password
+
+pipeline:
+   name: Oracle to Doris Pipeline
+   parallelism: 4
+```
+
+## Connector Options
+
+<div class="highlight">
+<table class="colwidths-auto docutils">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 10%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 65%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>hostname</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td> IP address or hostname of the Oracle database server.</td>
+    </tr>
+    <tr>
+      <td>port</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">1521</td>
+      <td>Integer</td>
+      <td>Integer port number of the Oracle database server.</td>
+    </tr>
+    <tr>
+      <td>username</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Name of the Oracle database to use when connecting to the Oracle 
database server.</td>
+    </tr>
+    <tr>
+      <td>password</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Password to use when connecting to the Oracle database server.</td>
+    </tr>
+    <tr>
+      <td>tables</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Table name of the Oracle database to monitor. The table-name also 
supports regular expressions to monitor multiple tables that satisfy the 
regular expressions. <br>
+          It is important to note that the dot (.) is treated as a delimiter 
for database and table names. 
+          If there is a need to use a dot (.) in a regular expression to match 
any character, it is necessary to escape the dot with a backslash.<br>
+          eg. db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*</td>
+    </tr>
+    <tr>
+      <td>schema-change.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>Whether to send schema change events, so that downstream sinks can 
respond to schema changes and achieve table structure synchronization.</td>
+    </tr>
+    <tr>
+      <td>scan.incremental.snapshot.chunk.size</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">8096</td>
+      <td>Integer</td>
+      <td>The chunk size (number of rows) of table snapshot, captured tables 
are split into multiple chunks when read the snapshot of table.</td>
+    </tr>
+    <tr>
+      <td>scan.snapshot.fetch.size</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">1024</td>
+      <td>Integer</td>
+      <td>The maximum fetch size for per poll when read table snapshot.</td>
+    </tr>
+    <tr>
+      <td>scan.startup.mode</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">initial</td>
+      <td>String</td>
+      <td>Optional startup mode for Oracle CDC consumer, valid enumerations 
are "initial","latest-offset".</td>
+    </tr>
+    <tr>
+      <td>debezium.*</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Pass-through Debezium's properties to Debezium Embedded Engine which 
is used to capture data changes from Oracle server.
+          For example: <code>'debezium.snapshot.mode' = 'never'</code>.
+          See more about the <a 
href="https://debezium.io/documentation/reference/1.9/connectors/oracle.html#oracle-connector-properties";>Debezium's
 Oracle Connector properties</a></td> 
+    </tr>
+    <tr>
+      <td>scan.incremental.close-idle-reader.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Whether to close idle readers at the end of the snapshot phase. <br>
+          The flink version is required to be greater than or equal to 1.14 
when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to 
true.<br>
+          If the flink version is greater than or equal to 1.15, the default 
value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has 
been changed to true,
+          so it does not need to be explicitly configured 
'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'</td>
+    </tr>
+    <tr>
+      <td>metadata.list</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>String</td>
+      <td>
+        List of readable metadata from SourceRecord to be passed to downstream 
and could be used in transform module, split by `,`. Available readable 
metadata are: op_ts.
+      </td>
+    <tr>
+      <td>jdbc.url</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td> The url for oracle jdbc ,the url will be used preferentially,if no 
url is configured, then use "jdbc:oracle:thin:@localhost:1521:orcl",but oracle 
19c url is "jdbc:oracle:thin:@//localhost:1521/pdb1",so the url property is 
option to adapt to different versions of Oracle.</td>

Review Comment:
    If the hostname and port parameter is configured, the URL is concatenated 
by hostname port database-name in SID format by default?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSource.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.VisibleForTesting;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.source.DataSource;
+import org.apache.flink.cdc.common.source.EventSourceProvider;
+import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider;
+import org.apache.flink.cdc.common.source.FlinkSourceProvider;
+import org.apache.flink.cdc.common.source.MetadataAccessor;
+import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfig;
+import 
org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfigFactory;
+import 
org.apache.flink.cdc.connectors.oracle.source.meta.offset.RedoLogOffsetFactory;
+import org.apache.flink.cdc.connectors.oracle.source.reader.OracleSourceReader;
+import 
org.apache.flink.cdc.connectors.oracle.source.reader.OracleTableSourceReader;
+import org.apache.flink.cdc.connectors.oracle.table.OracleReadableMetaData;
+import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
+import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a Oracle redo log 
from a logical
+ * description.
+ */
+public class OracleDataSource implements DataSource, SupportsReadingMetadata {
+
+    private final OracleSourceConfig sourceConfig;
+    private final Configuration config;
+    private final OracleSourceConfigFactory configFactory;
+
+    /** Data type that describes the final output of the source. */
+    protected DataType producedDataType;
+
+    /** Metadata that is appended at the end of a physical source row. */
+    protected List<String> metadataKeys;
+
+    private final List<OracleReadableMetaData> readableMetadataList;
+
+    public OracleDataSource(
+            OracleSourceConfigFactory configFactory,
+            Configuration config,
+            List<OracleReadableMetaData> readableMetadataList) {
+        this.sourceConfig = configFactory.create(0);
+        this.config = config;
+        this.metadataKeys = Collections.emptyList();
+        this.readableMetadataList = readableMetadataList;
+        this.configFactory = configFactory;
+    }
+
+    @Override
+    public EventSourceProvider getEventSourceProvider() {
+        boolean enableParallelRead =
+                
config.get(OracleDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+
+        OracleDialect oracleDialect = new OracleDialect();
+        OracleEventDeserializer deserializer =
+                new OracleEventDeserializer(
+                        DebeziumChangelogMode.ALL,
+                        
config.get(OracleDataSourceOptions.SCHEMA_CHANGE_ENABLED),
+                        readableMetadataList);
+
+        RedoLogOffsetFactory offsetFactory = new RedoLogOffsetFactory();
+        if (enableParallelRead) {

Review Comment:
   Is it possible to use the incremental snapshot method directly instead of 
calling the old interface? So there is no need to 'scan. Incremental. The 
snapshot. Enabled' this parameter. @LYanquan  do you agree?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference;
+
+import io.debezium.data.geometry.Geometry;
+import org.apache.kafka.connect.data.Schema;
+
+/** {@link DataType} inference for oracle debezium {@link Schema}. */
+@Internal
+public class OracleSchemaDataTypeInference extends 
DebeziumSchemaDataTypeInference {
+
+    private static final long serialVersionUID = 1L;
+
+    protected DataType inferStruct(Object value, Schema schema) {
+        // the Geometry datatype in oracle will be converted to
+        // a String with Json format
+        if (Geometry.LOGICAL_NAME.equals(schema.name())) {

Review Comment:
   This class also does not need to re-inherit the implementation.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference;
+
+import io.debezium.data.geometry.Geometry;
+import org.apache.kafka.connect.data.Schema;
+
+/** {@link DataType} inference for oracle debezium {@link Schema}. */
+@Internal
+public class OracleSchemaDataTypeInference extends 
DebeziumSchemaDataTypeInference {
+
+    private static final long serialVersionUID = 1L;
+
+    protected DataType inferStruct(Object value, Schema schema) {
+        // the Geometry datatype in oracle will be converted to
+        // a String with Json format
+        if (Geometry.LOGICAL_NAME.equals(schema.name())) {

Review Comment:
   oracle seems to have no geo related field types



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleEventDeserializer.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import 
org.apache.flink.cdc.connectors.oracle.source.parser.OracleAntlrDdlParser;
+import org.apache.flink.cdc.connectors.oracle.table.OracleReadableMetaData;
+import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
+import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
+import org.apache.flink.table.data.TimestampData;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.debezium.data.Envelope;
+import io.debezium.data.geometry.Geometry;
+import io.debezium.relational.Tables;
+import io.debezium.relational.history.HistoryRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.io.WKBReader;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getHistoryRecord;
+
+/** Event deserializer for {@link OracleDataSource}. */
+@Internal
+public class OracleEventDeserializer<Event> extends 
DebeziumEventDeserializationSchema {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String SCHEMA_CHANGE_EVENT_KEY_NAME =
+            "io.debezium.connector.oracle.SchemaChangeKey";
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private final boolean includeSchemaChanges;
+
+    private transient Tables tables;
+
+    private transient OracleAntlrDdlParser customParser;
+
+    List<OracleReadableMetaData> readableMetadataList;
+
+    public OracleEventDeserializer(
+            DebeziumChangelogMode changelogMode,
+            boolean includeSchemaChanges,
+            List<OracleReadableMetaData> readableMetadataList) {
+        super(new OracleSchemaDataTypeInference(), changelogMode);
+        this.includeSchemaChanges = includeSchemaChanges;
+        this.readableMetadataList = readableMetadataList;
+    }
+
+    @Override
+    protected List<SchemaChangeEvent> 
deserializeSchemaChangeRecord(SourceRecord record) {
+        if (includeSchemaChanges) {
+            try {
+                HistoryRecord historyRecord = getHistoryRecord(record);
+
+                String databaseName =
+                        
historyRecord.document().getString(HistoryRecord.Fields.DATABASE_NAME);
+                String schemaName =
+                        
historyRecord.document().getString(HistoryRecord.Fields.SCHEMA_NAME);
+                if (customParser == null) {
+                    customParser = new OracleAntlrDdlParser(databaseName, 
schemaName);
+                    tables = new Tables();
+                }
+                String ddl =
+                        
historyRecord.document().getString(HistoryRecord.Fields.DDL_STATEMENTS);
+                customParser.setCurrentDatabase(databaseName);
+                customParser.parse(ddl, tables);
+                return customParser.getAndClearParsedEvents();
+            } catch (Exception e) {
+                throw new IllegalStateException("Failed to parse the schema 
change : " + record, e);
+            }
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    protected boolean isDataChangeRecord(SourceRecord record) {
+        Schema valueSchema = record.valueSchema();
+        Struct value = (Struct) record.value();
+        return value != null
+                && valueSchema != null
+                && valueSchema.field(Envelope.FieldName.OPERATION) != null
+                && value.getString(Envelope.FieldName.OPERATION) != null;
+    }
+
+    @Override
+    protected boolean isSchemaChangeRecord(SourceRecord record) {
+        Schema keySchema = record.keySchema();
+        return keySchema != null && 
SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name());
+    }
+
+    @Override
+    protected TableId getTableId(SourceRecord record) {
+        String[] parts = record.topic().split("\\.");
+        return TableId.tableId(parts[1], parts[2]);
+    }
+
+    @Override
+    protected Map<String, String> getMetadata(SourceRecord record) {
+        Map<String, String> map = new HashMap<>();
+        readableMetadataList.forEach(
+                (oracleReadableMetaData -> {
+                    Object metadata = 
oracleReadableMetaData.getConverter().read(record);
+                    if 
(oracleReadableMetaData.equals(OracleReadableMetaData.OP_TS)) {
+                        map.put(
+                                oracleReadableMetaData.getKey(),
+                                String.valueOf(((TimestampData) 
metadata).getMillisecond()));
+                    } else {
+                        map.put(oracleReadableMetaData.getKey(), 
String.valueOf(metadata));
+                    }
+                }));
+        return map;
+    }
+
+    @Override
+    protected Object convertToString(Object dbzObj, Schema schema) {
+        // the Geometry datatype in oracle will be converted to
+        // a String with Json format
+        if (Geometry.LOGICAL_NAME.equals(schema.name())) {

Review Comment:
   oracle seems to have no geo related field types



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/OraclePipelineRecordEmitter.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source.reader;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
+import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
+import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfig;
+import org.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.TableId;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+/** The {@link RecordEmitter} implementation for Oracle pipeline connector. */
+public class OraclePipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmitter<T> {
+    private static final long serialVersionUID = 1L;
+    private List<String> tableList;
+    private List<CreateTableEvent> createTableEventCache = null;
+    private boolean alreadySendCreateTableForBinlogSplit = false;
+
+    public OraclePipelineRecordEmitter(
+            DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
+            SourceReaderMetrics sourceReaderMetrics,
+            boolean includeSchemaChanges,
+            OffsetFactory offsetFactory,
+            OracleSourceConfig sourceConfig) {
+        super(
+                debeziumDeserializationSchema,
+                sourceReaderMetrics,
+                includeSchemaChanges,
+                offsetFactory);
+        this.tableList = sourceConfig.getTableList();
+        this.createTableEventCache = new ArrayList<>();
+        try (JdbcConnection jdbc = 
OracleSchemaUtils.createOracleConnection(sourceConfig)) {
+
+            List<TableId> capturedTableIds = new ArrayList<>();
+            for (String table : tableList) {
+                TableId capturedTableId = 
TableId.parse(table.toUpperCase(Locale.ROOT));
+                capturedTableIds.add(capturedTableId);
+            }
+            for (TableId tableId : capturedTableIds) {
+                Schema schema = OracleSchemaUtils.getSchema(jdbc, tableId);
+                createTableEventCache.add(
+                        new CreateTableEvent(
+                                
org.apache.flink.cdc.common.event.TableId.tableId(
+                                        tableId.catalog(), tableId.table()),
+                                schema));
+            }

Review Comment:
   Does oracle need to support batch mode? If you need to support batch mode, 
you should get the snapshot parameter here.



##########
docs/content/docs/connectors/pipeline-connectors/oracle.md:
##########
@@ -0,0 +1,462 @@
+---
+title: "ORACLE"
+weight: 2
+type: docs
+aliases:
+- /connectors/pipeline-connectors/oracle
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Oracle Connector
+
+Oracle connector allows reading snapshot data and incremental data from Oracle 
database and provides end-to-end full-database data synchronization 
capabilities.
+This document describes how to setup the Oracle connector.
+
+
+## Example
+
+An example of the pipeline for reading data from Oracle and sink to Doris can 
be defined as follows:
+
+```yaml
+source:
+   type: oracle
+   name: Oracle Source
+   hostname: 127.0.0.1
+   port: 1521
+   username: debezium
+   password: password
+   database: ORCLDB
+   tables: testdb.\.*, testdb.user_table_[0-9]+, [app|web].order_\.*
+
+sink:
+  type: doris
+  name: Doris Sink
+  fenodes: 127.0.0.1:8030
+  username: root
+  password: password
+
+pipeline:
+   name: Oracle to Doris Pipeline
+   parallelism: 4
+```
+
+## Connector Options
+
+<div class="highlight">
+<table class="colwidths-auto docutils">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 10%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 65%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>hostname</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td> IP address or hostname of the Oracle database server.</td>
+    </tr>
+    <tr>
+      <td>port</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">1521</td>
+      <td>Integer</td>
+      <td>Integer port number of the Oracle database server.</td>
+    </tr>
+    <tr>
+      <td>username</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Name of the Oracle database to use when connecting to the Oracle 
database server.</td>
+    </tr>
+    <tr>
+      <td>password</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Password to use when connecting to the Oracle database server.</td>
+    </tr>
+    <tr>
+      <td>tables</td>

Review Comment:
   Why do tables want schema.table +database, not database.schema.table? Can a 
pipeline connect to only one database?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/dto/ColumnInfo.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.dto;
+
+import java.io.Serializable;
+
+/** dto related to Table schema. */
+public class ColumnInfo implements Serializable {

Review Comment:
   This class is not referenced, so why should it be left in the code?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.utils;
+
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import io.debezium.relational.Column;
+import oracle.jdbc.OracleTypes;
+
+import java.sql.Types;
+
+/** Utilities for converting from oracle types to {@link DataType}s. */
+public class OracleTypeUtils {

Review Comment:
   need supported geo field type?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/OracleSourceReader.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source.reader;
+
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.oracle.OracleValidator;
+import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfig;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
+import org.apache.flink.cdc.debezium.internal.DebeziumOffset;
+
+import io.debezium.connector.oracle.OracleConnector;
+
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A builder to build a SourceFunction which can read snapshot and continue to 
consume log miner.
+ */
+public class OracleSourceReader<T> {

Review Comment:
   We don't need to implement the function source anymore.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/OraclePipelineRecordEmitter.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source.reader;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
+import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
+import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfig;
+import org.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.TableId;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+/** The {@link RecordEmitter} implementation for Oracle pipeline connector. */
+public class OraclePipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmitter<T> {
+    private static final long serialVersionUID = 1L;
+    private List<String> tableList;
+    private List<CreateTableEvent> createTableEventCache = null;
+    private boolean alreadySendCreateTableForBinlogSplit = false;
+
+    public OraclePipelineRecordEmitter(
+            DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
+            SourceReaderMetrics sourceReaderMetrics,
+            boolean includeSchemaChanges,
+            OffsetFactory offsetFactory,
+            OracleSourceConfig sourceConfig) {
+        super(
+                debeziumDeserializationSchema,
+                sourceReaderMetrics,
+                includeSchemaChanges,
+                offsetFactory);
+        this.tableList = sourceConfig.getTableList();
+        this.createTableEventCache = new ArrayList<>();
+        try (JdbcConnection jdbc = 
OracleSchemaUtils.createOracleConnection(sourceConfig)) {
+
+            List<TableId> capturedTableIds = new ArrayList<>();
+            for (String table : tableList) {
+                TableId capturedTableId = 
TableId.parse(table.toUpperCase(Locale.ROOT));
+                capturedTableIds.add(capturedTableId);
+            }
+            for (TableId tableId : capturedTableIds) {
+                Schema schema = OracleSchemaUtils.getSchema(jdbc, tableId);
+                createTableEventCache.add(
+                        new CreateTableEvent(
+                                
org.apache.flink.cdc.common.event.TableId.tableId(
+                                        tableId.catalog(), tableId.table()),
+                                schema));
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException("Cannot start emitter to fetch table 
schema.", e);
+        }
+    }
+
+    @Override
+    protected void processElement(
+            SourceRecord element, SourceOutput<T> output, SourceSplitState 
splitState)
+            throws Exception {

Review Comment:
   Read the pg emitter code carefully before implementing this feature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to