Mrart commented on code in PR #3995: URL: https://github.com/apache/flink-cdc/pull/3995#discussion_r2596778551
########## docs/content/docs/connectors/pipeline-connectors/oracle.md: ########## @@ -0,0 +1,462 @@ +--- +title: "ORACLE" +weight: 2 +type: docs +aliases: +- /connectors/pipeline-connectors/oracle +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Oracle Connector + +Oracle connector allows reading snapshot data and incremental data from Oracle database and provides end-to-end full-database data synchronization capabilities. +This document describes how to setup the Oracle connector. + + +## Example + +An example of the pipeline for reading data from Oracle and sink to Doris can be defined as follows: + +```yaml +source: + type: oracle + name: Oracle Source + hostname: 127.0.0.1 + port: 1521 + username: debezium + password: password + database: ORCLDB + tables: testdb.\.*, testdb.user_table_[0-9]+, [app|web].order_\.* + +sink: + type: doris + name: Doris Sink + fenodes: 127.0.0.1:8030 + username: root + password: password + +pipeline: + name: Oracle to Doris Pipeline + parallelism: 4 +``` + +## Connector Options + +<div class="highlight"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left" style="width: 10%">Option</th> + <th class="text-left" style="width: 8%">Required</th> + <th class="text-left" style="width: 7%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 65%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>hostname</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td> IP address or hostname of the Oracle database server.</td> + </tr> + <tr> + <td>port</td> + <td>required</td> + <td style="word-wrap: break-word;">1521</td> + <td>Integer</td> + <td>Integer port number of the Oracle database server.</td> + </tr> + <tr> + <td>username</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Name of the Oracle database to use when connecting to the Oracle database server.</td> + </tr> + <tr> + <td>password</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Password to use when connecting to the Oracle database server.</td> + </tr> + <tr> + <td>tables</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Table name of the Oracle database to monitor. The table-name also supports regular expressions to monitor multiple tables that satisfy the regular expressions. <br> + It is important to note that the dot (.) is treated as a delimiter for database and table names. + If there is a need to use a dot (.) in a regular expression to match any character, it is necessary to escape the dot with a backslash.<br> + eg. db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*</td> + </tr> + <tr> + <td>schema-change.enabled</td> + <td>optional</td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Whether to send schema change events, so that downstream sinks can respond to schema changes and achieve table structure synchronization.</td> + </tr> + <tr> + <td>scan.incremental.snapshot.chunk.size</td> + <td>optional</td> + <td style="word-wrap: break-word;">8096</td> + <td>Integer</td> + <td>The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table.</td> + </tr> + <tr> + <td>scan.snapshot.fetch.size</td> + <td>optional</td> + <td style="word-wrap: break-word;">1024</td> + <td>Integer</td> + <td>The maximum fetch size for per poll when read table snapshot.</td> + </tr> + <tr> + <td>scan.startup.mode</td> + <td>optional</td> + <td style="word-wrap: break-word;">initial</td> + <td>String</td> + <td>Optional startup mode for Oracle CDC consumer, valid enumerations are "initial","latest-offset".</td> + </tr> + <tr> + <td>debezium.*</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from Oracle server. + For example: <code>'debezium.snapshot.mode' = 'never'</code>. + See more about the <a href="https://debezium.io/documentation/reference/1.9/connectors/oracle.html#oracle-connector-properties">Debezium's Oracle Connector properties</a></td> + </tr> + <tr> + <td>scan.incremental.close-idle-reader.enabled</td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Whether to close idle readers at the end of the snapshot phase. <br> + The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.<br> + If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true, + so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'</td> + </tr> + <tr> + <td>metadata.list</td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>String</td> + <td> + List of readable metadata from SourceRecord to be passed to downstream and could be used in transform module, split by `,`. Available readable metadata are: op_ts. + </td> + <tr> + <td>jdbc.url</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td> The url for oracle jdbc ,the url will be used preferentially,if no url is configured, then use "jdbc:oracle:thin:@localhost:1521:orcl",but oracle 19c url is "jdbc:oracle:thin:@//localhost:1521/pdb1",so the url property is option to adapt to different versions of Oracle.</td> Review Comment: If the hostname and port parameter is configured, the URL is concatenated by hostname port database-name in SID format by default? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSource.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.source; + +import org.apache.flink.cdc.common.annotation.VisibleForTesting; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.source.DataSource; +import org.apache.flink.cdc.common.source.EventSourceProvider; +import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider; +import org.apache.flink.cdc.common.source.FlinkSourceProvider; +import org.apache.flink.cdc.common.source.MetadataAccessor; +import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfig; +import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfigFactory; +import org.apache.flink.cdc.connectors.oracle.source.meta.offset.RedoLogOffsetFactory; +import org.apache.flink.cdc.connectors.oracle.source.reader.OracleSourceReader; +import org.apache.flink.cdc.connectors.oracle.source.reader.OracleTableSourceReader; +import org.apache.flink.cdc.connectors.oracle.table.OracleReadableMetaData; +import org.apache.flink.cdc.debezium.DebeziumSourceFunction; +import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.types.DataType; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A {@link DynamicTableSource} that describes how to create a Oracle redo log from a logical + * description. + */ +public class OracleDataSource implements DataSource, SupportsReadingMetadata { + + private final OracleSourceConfig sourceConfig; + private final Configuration config; + private final OracleSourceConfigFactory configFactory; + + /** Data type that describes the final output of the source. */ + protected DataType producedDataType; + + /** Metadata that is appended at the end of a physical source row. */ + protected List<String> metadataKeys; + + private final List<OracleReadableMetaData> readableMetadataList; + + public OracleDataSource( + OracleSourceConfigFactory configFactory, + Configuration config, + List<OracleReadableMetaData> readableMetadataList) { + this.sourceConfig = configFactory.create(0); + this.config = config; + this.metadataKeys = Collections.emptyList(); + this.readableMetadataList = readableMetadataList; + this.configFactory = configFactory; + } + + @Override + public EventSourceProvider getEventSourceProvider() { + boolean enableParallelRead = + config.get(OracleDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED); + + OracleDialect oracleDialect = new OracleDialect(); + OracleEventDeserializer deserializer = + new OracleEventDeserializer( + DebeziumChangelogMode.ALL, + config.get(OracleDataSourceOptions.SCHEMA_CHANGE_ENABLED), + readableMetadataList); + + RedoLogOffsetFactory offsetFactory = new RedoLogOffsetFactory(); + if (enableParallelRead) { Review Comment: Is it possible to use the incremental snapshot method directly instead of calling the old interface? So there is no need to 'scan. Incremental. The snapshot. Enabled' this parameter. @LYanquan do you agree? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.source; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference; + +import io.debezium.data.geometry.Geometry; +import org.apache.kafka.connect.data.Schema; + +/** {@link DataType} inference for oracle debezium {@link Schema}. */ +@Internal +public class OracleSchemaDataTypeInference extends DebeziumSchemaDataTypeInference { + + private static final long serialVersionUID = 1L; + + protected DataType inferStruct(Object value, Schema schema) { + // the Geometry datatype in oracle will be converted to + // a String with Json format + if (Geometry.LOGICAL_NAME.equals(schema.name())) { Review Comment: This class also does not need to re-inherit the implementation. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.source; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference; + +import io.debezium.data.geometry.Geometry; +import org.apache.kafka.connect.data.Schema; + +/** {@link DataType} inference for oracle debezium {@link Schema}. */ +@Internal +public class OracleSchemaDataTypeInference extends DebeziumSchemaDataTypeInference { + + private static final long serialVersionUID = 1L; + + protected DataType inferStruct(Object value, Schema schema) { + // the Geometry datatype in oracle will be converted to + // a String with Json format + if (Geometry.LOGICAL_NAME.equals(schema.name())) { Review Comment: oracle seems to have no geo related field types ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleEventDeserializer.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.source; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.connectors.oracle.source.parser.OracleAntlrDdlParser; +import org.apache.flink.cdc.connectors.oracle.table.OracleReadableMetaData; +import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema; +import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; +import org.apache.flink.table.data.TimestampData; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.debezium.data.Envelope; +import io.debezium.data.geometry.Geometry; +import io.debezium.relational.Tables; +import io.debezium.relational.history.HistoryRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.io.WKBReader; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getHistoryRecord; + +/** Event deserializer for {@link OracleDataSource}. */ +@Internal +public class OracleEventDeserializer<Event> extends DebeziumEventDeserializationSchema { + + private static final long serialVersionUID = 1L; + + public static final String SCHEMA_CHANGE_EVENT_KEY_NAME = + "io.debezium.connector.oracle.SchemaChangeKey"; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final boolean includeSchemaChanges; + + private transient Tables tables; + + private transient OracleAntlrDdlParser customParser; + + List<OracleReadableMetaData> readableMetadataList; + + public OracleEventDeserializer( + DebeziumChangelogMode changelogMode, + boolean includeSchemaChanges, + List<OracleReadableMetaData> readableMetadataList) { + super(new OracleSchemaDataTypeInference(), changelogMode); + this.includeSchemaChanges = includeSchemaChanges; + this.readableMetadataList = readableMetadataList; + } + + @Override + protected List<SchemaChangeEvent> deserializeSchemaChangeRecord(SourceRecord record) { + if (includeSchemaChanges) { + try { + HistoryRecord historyRecord = getHistoryRecord(record); + + String databaseName = + historyRecord.document().getString(HistoryRecord.Fields.DATABASE_NAME); + String schemaName = + historyRecord.document().getString(HistoryRecord.Fields.SCHEMA_NAME); + if (customParser == null) { + customParser = new OracleAntlrDdlParser(databaseName, schemaName); + tables = new Tables(); + } + String ddl = + historyRecord.document().getString(HistoryRecord.Fields.DDL_STATEMENTS); + customParser.setCurrentDatabase(databaseName); + customParser.parse(ddl, tables); + return customParser.getAndClearParsedEvents(); + } catch (Exception e) { + throw new IllegalStateException("Failed to parse the schema change : " + record, e); + } + } + return Collections.emptyList(); + } + + @Override + protected boolean isDataChangeRecord(SourceRecord record) { + Schema valueSchema = record.valueSchema(); + Struct value = (Struct) record.value(); + return value != null + && valueSchema != null + && valueSchema.field(Envelope.FieldName.OPERATION) != null + && value.getString(Envelope.FieldName.OPERATION) != null; + } + + @Override + protected boolean isSchemaChangeRecord(SourceRecord record) { + Schema keySchema = record.keySchema(); + return keySchema != null && SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name()); + } + + @Override + protected TableId getTableId(SourceRecord record) { + String[] parts = record.topic().split("\\."); + return TableId.tableId(parts[1], parts[2]); + } + + @Override + protected Map<String, String> getMetadata(SourceRecord record) { + Map<String, String> map = new HashMap<>(); + readableMetadataList.forEach( + (oracleReadableMetaData -> { + Object metadata = oracleReadableMetaData.getConverter().read(record); + if (oracleReadableMetaData.equals(OracleReadableMetaData.OP_TS)) { + map.put( + oracleReadableMetaData.getKey(), + String.valueOf(((TimestampData) metadata).getMillisecond())); + } else { + map.put(oracleReadableMetaData.getKey(), String.valueOf(metadata)); + } + })); + return map; + } + + @Override + protected Object convertToString(Object dbzObj, Schema schema) { + // the Geometry datatype in oracle will be converted to + // a String with Json format + if (Geometry.LOGICAL_NAME.equals(schema.name())) { Review Comment: oracle seems to have no geo related field types ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/OraclePipelineRecordEmitter.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.source.reader; + +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory; +import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState; +import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics; +import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter; +import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfig; +import org.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.connector.base.source.reader.RecordEmitter; + +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.TableId; +import org.apache.kafka.connect.source.SourceRecord; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; + +/** The {@link RecordEmitter} implementation for Oracle pipeline connector. */ +public class OraclePipelineRecordEmitter<T> extends IncrementalSourceRecordEmitter<T> { + private static final long serialVersionUID = 1L; + private List<String> tableList; + private List<CreateTableEvent> createTableEventCache = null; + private boolean alreadySendCreateTableForBinlogSplit = false; + + public OraclePipelineRecordEmitter( + DebeziumDeserializationSchema<T> debeziumDeserializationSchema, + SourceReaderMetrics sourceReaderMetrics, + boolean includeSchemaChanges, + OffsetFactory offsetFactory, + OracleSourceConfig sourceConfig) { + super( + debeziumDeserializationSchema, + sourceReaderMetrics, + includeSchemaChanges, + offsetFactory); + this.tableList = sourceConfig.getTableList(); + this.createTableEventCache = new ArrayList<>(); + try (JdbcConnection jdbc = OracleSchemaUtils.createOracleConnection(sourceConfig)) { + + List<TableId> capturedTableIds = new ArrayList<>(); + for (String table : tableList) { + TableId capturedTableId = TableId.parse(table.toUpperCase(Locale.ROOT)); + capturedTableIds.add(capturedTableId); + } + for (TableId tableId : capturedTableIds) { + Schema schema = OracleSchemaUtils.getSchema(jdbc, tableId); + createTableEventCache.add( + new CreateTableEvent( + org.apache.flink.cdc.common.event.TableId.tableId( + tableId.catalog(), tableId.table()), + schema)); + } Review Comment: Does oracle need to support batch mode? If you need to support batch mode, you should get the snapshot parameter here. ########## docs/content/docs/connectors/pipeline-connectors/oracle.md: ########## @@ -0,0 +1,462 @@ +--- +title: "ORACLE" +weight: 2 +type: docs +aliases: +- /connectors/pipeline-connectors/oracle +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Oracle Connector + +Oracle connector allows reading snapshot data and incremental data from Oracle database and provides end-to-end full-database data synchronization capabilities. +This document describes how to setup the Oracle connector. + + +## Example + +An example of the pipeline for reading data from Oracle and sink to Doris can be defined as follows: + +```yaml +source: + type: oracle + name: Oracle Source + hostname: 127.0.0.1 + port: 1521 + username: debezium + password: password + database: ORCLDB + tables: testdb.\.*, testdb.user_table_[0-9]+, [app|web].order_\.* + +sink: + type: doris + name: Doris Sink + fenodes: 127.0.0.1:8030 + username: root + password: password + +pipeline: + name: Oracle to Doris Pipeline + parallelism: 4 +``` + +## Connector Options + +<div class="highlight"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left" style="width: 10%">Option</th> + <th class="text-left" style="width: 8%">Required</th> + <th class="text-left" style="width: 7%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 65%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>hostname</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td> IP address or hostname of the Oracle database server.</td> + </tr> + <tr> + <td>port</td> + <td>required</td> + <td style="word-wrap: break-word;">1521</td> + <td>Integer</td> + <td>Integer port number of the Oracle database server.</td> + </tr> + <tr> + <td>username</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Name of the Oracle database to use when connecting to the Oracle database server.</td> + </tr> + <tr> + <td>password</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Password to use when connecting to the Oracle database server.</td> + </tr> + <tr> + <td>tables</td> Review Comment: Why do tables want schema.table +database, not database.schema.table? Can a pipeline connect to only one database? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/dto/ColumnInfo.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.dto; + +import java.io.Serializable; + +/** dto related to Table schema. */ +public class ColumnInfo implements Serializable { Review Comment: This class is not referenced, so why should it be left in the code? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.utils; + +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; + +import io.debezium.relational.Column; +import oracle.jdbc.OracleTypes; + +import java.sql.Types; + +/** Utilities for converting from oracle types to {@link DataType}s. */ +public class OracleTypeUtils { Review Comment: need supported geo field type? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/OracleSourceReader.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.source.reader; + +import org.apache.flink.cdc.connectors.base.options.StartupOptions; +import org.apache.flink.cdc.connectors.oracle.OracleValidator; +import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfig; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.cdc.debezium.DebeziumSourceFunction; +import org.apache.flink.cdc.debezium.internal.DebeziumOffset; + +import io.debezium.connector.oracle.OracleConnector; + +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A builder to build a SourceFunction which can read snapshot and continue to consume log miner. + */ +public class OracleSourceReader<T> { Review Comment: We don't need to implement the function source anymore. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/OraclePipelineRecordEmitter.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.source.reader; + +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory; +import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState; +import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics; +import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter; +import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfig; +import org.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.connector.base.source.reader.RecordEmitter; + +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.TableId; +import org.apache.kafka.connect.source.SourceRecord; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; + +/** The {@link RecordEmitter} implementation for Oracle pipeline connector. */ +public class OraclePipelineRecordEmitter<T> extends IncrementalSourceRecordEmitter<T> { + private static final long serialVersionUID = 1L; + private List<String> tableList; + private List<CreateTableEvent> createTableEventCache = null; + private boolean alreadySendCreateTableForBinlogSplit = false; + + public OraclePipelineRecordEmitter( + DebeziumDeserializationSchema<T> debeziumDeserializationSchema, + SourceReaderMetrics sourceReaderMetrics, + boolean includeSchemaChanges, + OffsetFactory offsetFactory, + OracleSourceConfig sourceConfig) { + super( + debeziumDeserializationSchema, + sourceReaderMetrics, + includeSchemaChanges, + offsetFactory); + this.tableList = sourceConfig.getTableList(); + this.createTableEventCache = new ArrayList<>(); + try (JdbcConnection jdbc = OracleSchemaUtils.createOracleConnection(sourceConfig)) { + + List<TableId> capturedTableIds = new ArrayList<>(); + for (String table : tableList) { + TableId capturedTableId = TableId.parse(table.toUpperCase(Locale.ROOT)); + capturedTableIds.add(capturedTableId); + } + for (TableId tableId : capturedTableIds) { + Schema schema = OracleSchemaUtils.getSchema(jdbc, tableId); + createTableEventCache.add( + new CreateTableEvent( + org.apache.flink.cdc.common.event.TableId.tableId( + tableId.catalog(), tableId.table()), + schema)); + } + } catch (SQLException e) { + throw new RuntimeException("Cannot start emitter to fetch table schema.", e); + } + } + + @Override + protected void processElement( + SourceRecord element, SourceOutput<T> output, SourceSplitState splitState) + throws Exception { Review Comment: Read the pg emitter code carefully before implementing this feature. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
