lvyanquan commented on code in PR #3995: URL: https://github.com/apache/flink-cdc/pull/3995#discussion_r2646806469
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/pom.xml: ########## @@ -0,0 +1,266 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Copyright 2023 Ververica Inc. Review Comment: Should use ASF license. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory: ########## @@ -0,0 +1,15 @@ +# Copyright 2023 Ververica Inc. Review Comment: We should use ASF License. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSourceOptions.java: ########## @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.source; + +import org.apache.flink.cdc.common.annotation.Experimental; +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.ConfigOptions; +import org.apache.flink.cdc.common.configuration.Configuration; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +/** Configurations for {@link OracleDataSource}. */ +@PublicEvolving +public class OracleDataSourceOptions { + + public static final ConfigOption<String> JDBC_URL = + ConfigOptions.key("jdbc.url") + .stringType() + .noDefaultValue() + .withDescription( + "The url for oracle jdbc ,the url will be used preferentially,if no url is configured, then use \"jdbc:oracle:thin:@localhost:1521:orcl\",but oracle 19c url is \"jdbc:oracle:thin:@//localhost:1521/pdb1\",so the url property is option to adapt to different versions of Oracle"); + + public static final ConfigOption<String> HOSTNAME = + ConfigOptions.key("hostname") + .stringType() + .noDefaultValue() + .withDescription("IP address or hostname of the oracle database server."); + + public static final ConfigOption<Integer> PORT = + ConfigOptions.key("port") + .intType() + .defaultValue(3306) + .withDescription("Integer port number of the oracle database server."); + + public static final ConfigOption<String> USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription( + "Name of the oracle database to use when connecting to the oracle database server."); + + public static final ConfigOption<String> PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription( + "Password to use when connecting to the oracle database server."); + + public static final ConfigOption<String> DATABASE = + ConfigOptions.key("database") + .stringType() + .noDefaultValue() + .withDescription( + "Password to use when connecting to the oracle database server."); + public static final ConfigOption<String> TABLES = + ConfigOptions.key("tables") + .stringType() + .noDefaultValue() + .withDescription( + "Table names of the oracle tables to monitor. Regular expressions are supported. " + + "It is important to note that the dot (.) is treated as a delimiter for database and table names. " + + "If there is a need to use a dot (.) in a regular expression to match any character, " + + "it is necessary to escape the dot with a backslash." + + "eg. test.user_table_[0-9]+, test[0-9].[app|web]_order_\\.*"); + + public static final ConfigOption<String> SERVER_TIME_ZONE = + ConfigOptions.key("server-time-zone") + .stringType() + .noDefaultValue() + .withDescription( + "The session time zone in database server. If not set, then " + + "ZoneId.systemDefault() is used to determine the server time zone."); + + public static final ConfigOption<Integer> SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE = + ConfigOptions.key("scan.incremental.snapshot.chunk.size") + .intType() + .defaultValue(8096) + .withDescription( + "The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table."); + + public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE = + ConfigOptions.key("scan.snapshot.fetch.size") + .intType() + .defaultValue(1024) + .withDescription( + "The maximum fetch size for per poll when read table snapshot."); + + public static final ConfigOption<Duration> CONNECT_TIMEOUT = + ConfigOptions.key("connect.timeout") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "The maximum time that the connector should wait after trying to connect to the oracle database server before timing out."); + + public static final ConfigOption<Integer> CONNECTION_POOL_SIZE = + ConfigOptions.key("connection.pool.size") + .intType() + .defaultValue(20) + .withDescription("The connection pool size."); + + public static final ConfigOption<Integer> CONNECT_MAX_RETRIES = + ConfigOptions.key("connect.max-retries") + .intType() + .defaultValue(3) + .withDescription( + "The max retry times that the connector should retry to build oracle database server connection."); + + public static final ConfigOption<String> SCAN_STARTUP_MODE = + ConfigOptions.key("scan.startup.mode") + .stringType() + .defaultValue("initial") + .withDescription( + "Optional startup mode for oracle CDC consumer, valid enumerations are " + + "\"initial\", \"latest-offset\", \"snapshot\""); + + public static final ConfigOption<Duration> HEARTBEAT_INTERVAL = + ConfigOptions.key("heartbeat.interval") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "Optional interval of sending heartbeat event for tracing the latest available binlog offsets"); Review Comment: binlog? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/resources/log4j2-test.properties: ########## @@ -0,0 +1,26 @@ +################################################################################ +# Copyright 2023 Ververica Inc. Review Comment: Ditto. Maybe you could modify all files at once. ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java: ########## @@ -371,7 +372,7 @@ protected Object convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) { if (dbzObj instanceof String) { String str = (String) dbzObj; // TIMESTAMP_LTZ type is encoded in string type - Instant instant = Instant.parse(str); + Instant instant = ZonedTimestamp.FORMATTER.parse(str, Instant::from); Review Comment: Could you add some explanation about why this change is necessary? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSourceOptions.java: ########## @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.source; + +import org.apache.flink.cdc.common.annotation.Experimental; +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.ConfigOptions; +import org.apache.flink.cdc.common.configuration.Configuration; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +/** Configurations for {@link OracleDataSource}. */ +@PublicEvolving +public class OracleDataSourceOptions { + + public static final ConfigOption<String> JDBC_URL = + ConfigOptions.key("jdbc.url") + .stringType() + .noDefaultValue() + .withDescription( + "The url for oracle jdbc ,the url will be used preferentially,if no url is configured, then use \"jdbc:oracle:thin:@localhost:1521:orcl\",but oracle 19c url is \"jdbc:oracle:thin:@//localhost:1521/pdb1\",so the url property is option to adapt to different versions of Oracle"); + + public static final ConfigOption<String> HOSTNAME = + ConfigOptions.key("hostname") + .stringType() + .noDefaultValue() + .withDescription("IP address or hostname of the oracle database server."); + + public static final ConfigOption<Integer> PORT = + ConfigOptions.key("port") + .intType() + .defaultValue(3306) + .withDescription("Integer port number of the oracle database server."); + + public static final ConfigOption<String> USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription( + "Name of the oracle database to use when connecting to the oracle database server."); + + public static final ConfigOption<String> PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription( + "Password to use when connecting to the oracle database server."); + + public static final ConfigOption<String> DATABASE = + ConfigOptions.key("database") + .stringType() + .noDefaultValue() + .withDescription( + "Password to use when connecting to the oracle database server."); + public static final ConfigOption<String> TABLES = + ConfigOptions.key("tables") + .stringType() + .noDefaultValue() + .withDescription( + "Table names of the oracle tables to monitor. Regular expressions are supported. " + + "It is important to note that the dot (.) is treated as a delimiter for database and table names. " + + "If there is a need to use a dot (.) in a regular expression to match any character, " + + "it is necessary to escape the dot with a backslash." + + "eg. test.user_table_[0-9]+, test[0-9].[app|web]_order_\\.*"); + + public static final ConfigOption<String> SERVER_TIME_ZONE = + ConfigOptions.key("server-time-zone") + .stringType() + .noDefaultValue() + .withDescription( + "The session time zone in database server. If not set, then " + + "ZoneId.systemDefault() is used to determine the server time zone."); + + public static final ConfigOption<Integer> SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE = + ConfigOptions.key("scan.incremental.snapshot.chunk.size") + .intType() + .defaultValue(8096) + .withDescription( + "The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table."); + + public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE = + ConfigOptions.key("scan.snapshot.fetch.size") + .intType() + .defaultValue(1024) + .withDescription( + "The maximum fetch size for per poll when read table snapshot."); + + public static final ConfigOption<Duration> CONNECT_TIMEOUT = + ConfigOptions.key("connect.timeout") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "The maximum time that the connector should wait after trying to connect to the oracle database server before timing out."); + + public static final ConfigOption<Integer> CONNECTION_POOL_SIZE = + ConfigOptions.key("connection.pool.size") + .intType() + .defaultValue(20) + .withDescription("The connection pool size."); + + public static final ConfigOption<Integer> CONNECT_MAX_RETRIES = + ConfigOptions.key("connect.max-retries") + .intType() + .defaultValue(3) + .withDescription( + "The max retry times that the connector should retry to build oracle database server connection."); + + public static final ConfigOption<String> SCAN_STARTUP_MODE = + ConfigOptions.key("scan.startup.mode") + .stringType() + .defaultValue("initial") + .withDescription( + "Optional startup mode for oracle CDC consumer, valid enumerations are " + + "\"initial\", \"latest-offset\", \"snapshot\""); + Review Comment: `snapshot` mode was not added to the document. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java: ########## @@ -0,0 +1,1487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.ChangeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.factories.Factory; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.source.FlinkSourceProvider; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; +import org.apache.flink.cdc.connectors.oracle.factory.OracleDataSourceFactory; +import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfigFactory; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.util.CloseableIterator; + +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.testcontainers.lifecycle.Startables; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.cdc.connectors.oracle.source.OracleDataSourceOptions.DATABASE; +import static org.apache.flink.cdc.connectors.oracle.source.OracleDataSourceOptions.HOSTNAME; +import static org.apache.flink.cdc.connectors.oracle.source.OracleDataSourceOptions.METADATA_LIST; +import static org.apache.flink.cdc.connectors.oracle.source.OracleDataSourceOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.oracle.source.OracleDataSourceOptions.PORT; +import static org.apache.flink.cdc.connectors.oracle.source.OracleDataSourceOptions.SCAN_STARTUP_MODE; +import static org.apache.flink.cdc.connectors.oracle.source.OracleDataSourceOptions.TABLES; +import static org.apache.flink.cdc.connectors.oracle.source.OracleDataSourceOptions.USERNAME; +import static org.assertj.core.api.Assertions.assertThat; + +/** IT tests for {@link OracleDataSource}. */ +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class OraclePipelineITCase extends OracleSourceTestBase { Review Comment: It's necessary to add some tests to cover the scene of restoring from savepoint with different start up mode. ########## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; +import org.apache.flink.util.FlinkRuntimeException; + +import io.debezium.relational.TableId; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.OracleContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.TOP_SECRET; +import static org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase.TOP_USER; + +/** End-to-end tests for Oracle cdc pipeline job. */ +public class OracleE2eITCase extends PipelineTestEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(OracleE2eITCase.class); + protected static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); + private static OracleContainer oracle; + // ------------------------------------------------------------------------------------------ + // Oracle Variables (we always use Oracle as the data source for easier verifying) + // ------------------------------------------------------------------------------------------ + public static final String ORACLE_DATABASE = "ORCLCDB"; + public static final String CONNECTOR_USER = "dbzuser"; + public static final String CONNECTOR_PWD = "dbz"; + public static final String TEST_USER = "debezium"; + public static final String TEST_PWD = "dbz"; + public static final String ORACLE_IMAGE = "goodboy008/oracle-19.3.0-ee"; + private static final String INTER_CONTAINER_ORACLE_ALIAS = "oracle"; + private static final Path oracleOjdbcJar = TestUtils.getResource("oracle-ojdbc.jar"); + + @BeforeEach + public void before() throws Exception { + super.before(); + LOG.info("Starting containers..."); + + oracle = + new OracleContainer( + DockerImageName.parse(ORACLE_IMAGE) + .withTag( + DockerClientFactory.instance() + .client() + .versionCmd() + .exec() + .getArch() + .equals("amd64") + ? "non-cdb" + : "arm-non-cdb")) + .withUsername(CONNECTOR_USER) + .withPassword(CONNECTOR_PWD) + .withDatabaseName(ORACLE_DATABASE) + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_ORACLE_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(LOG)) + .withReuse(true); + + Startables.deepStart(Stream.of(oracle)).join(); + initializeOracleTable("product"); + LOG.info("Containers are started."); + } + + @AfterEach + public void after() { + super.after(); + } + + @Test + void testSyncWholeDatabase() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: oracle\n" + + " hostname: %s\n" + + " port: %d\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.PRODUCTS\n" Review Comment: There is only one table here, you can add a few more tables. ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java: ########## @@ -185,6 +185,9 @@ protected DataType inferBytes(Object value, Schema schema) { if (precision > DecimalType.MAX_PRECISION) { return DataTypes.STRING(); } + if (scale < 0 || scale > 36) { + return DataTypes.STRING(); Review Comment: Please explain why this change is necessary. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSourceOptions.java: ########## @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.source; + +import org.apache.flink.cdc.common.annotation.Experimental; +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.ConfigOptions; +import org.apache.flink.cdc.common.configuration.Configuration; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +/** Configurations for {@link OracleDataSource}. */ +@PublicEvolving +public class OracleDataSourceOptions { + + public static final ConfigOption<String> JDBC_URL = + ConfigOptions.key("jdbc.url") + .stringType() + .noDefaultValue() + .withDescription( + "The url for oracle jdbc ,the url will be used preferentially,if no url is configured, then use \"jdbc:oracle:thin:@localhost:1521:orcl\",but oracle 19c url is \"jdbc:oracle:thin:@//localhost:1521/pdb1\",so the url property is option to adapt to different versions of Oracle"); + + public static final ConfigOption<String> HOSTNAME = + ConfigOptions.key("hostname") + .stringType() + .noDefaultValue() + .withDescription("IP address or hostname of the oracle database server."); + + public static final ConfigOption<Integer> PORT = + ConfigOptions.key("port") + .intType() + .defaultValue(3306) + .withDescription("Integer port number of the oracle database server."); + + public static final ConfigOption<String> USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription( + "Name of the oracle database to use when connecting to the oracle database server."); + + public static final ConfigOption<String> PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription( + "Password to use when connecting to the oracle database server."); + + public static final ConfigOption<String> DATABASE = + ConfigOptions.key("database") + .stringType() + .noDefaultValue() + .withDescription( + "Password to use when connecting to the oracle database server."); + public static final ConfigOption<String> TABLES = + ConfigOptions.key("tables") + .stringType() + .noDefaultValue() + .withDescription( + "Table names of the oracle tables to monitor. Regular expressions are supported. " + + "It is important to note that the dot (.) is treated as a delimiter for database and table names. " + + "If there is a need to use a dot (.) in a regular expression to match any character, " + + "it is necessary to escape the dot with a backslash." + + "eg. test.user_table_[0-9]+, test[0-9].[app|web]_order_\\.*"); + + public static final ConfigOption<String> SERVER_TIME_ZONE = + ConfigOptions.key("server-time-zone") + .stringType() + .noDefaultValue() + .withDescription( + "The session time zone in database server. If not set, then " + + "ZoneId.systemDefault() is used to determine the server time zone."); + + public static final ConfigOption<Integer> SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE = + ConfigOptions.key("scan.incremental.snapshot.chunk.size") + .intType() + .defaultValue(8096) + .withDescription( + "The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table."); + + public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE = + ConfigOptions.key("scan.snapshot.fetch.size") + .intType() + .defaultValue(1024) + .withDescription( + "The maximum fetch size for per poll when read table snapshot."); + + public static final ConfigOption<Duration> CONNECT_TIMEOUT = + ConfigOptions.key("connect.timeout") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "The maximum time that the connector should wait after trying to connect to the oracle database server before timing out."); + + public static final ConfigOption<Integer> CONNECTION_POOL_SIZE = + ConfigOptions.key("connection.pool.size") + .intType() + .defaultValue(20) + .withDescription("The connection pool size."); + + public static final ConfigOption<Integer> CONNECT_MAX_RETRIES = + ConfigOptions.key("connect.max-retries") + .intType() + .defaultValue(3) + .withDescription( + "The max retry times that the connector should retry to build oracle database server connection."); + + public static final ConfigOption<String> SCAN_STARTUP_MODE = + ConfigOptions.key("scan.startup.mode") + .stringType() + .defaultValue("initial") + .withDescription( + "Optional startup mode for oracle CDC consumer, valid enumerations are " + + "\"initial\", \"latest-offset\", \"snapshot\""); + + public static final ConfigOption<Duration> HEARTBEAT_INTERVAL = + ConfigOptions.key("heartbeat.interval") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "Optional interval of sending heartbeat event for tracing the latest available binlog offsets"); + + @Experimental + public static final ConfigOption<Integer> CHUNK_META_GROUP_SIZE = + ConfigOptions.key("chunk-meta.group.size") + .intType() + .defaultValue(1000) + .withDescription( + "The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups."); + + @Experimental + public static final ConfigOption<Double> CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND = + ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound") + .doubleType() + .defaultValue(1000.0d) + .withFallbackKeys("split-key.even-distribution.factor.upper-bound") + .withDescription( + "The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the" + + " table is evenly distribution or not." + + " The table chunks would use evenly calculation optimization when the data distribution is even," + + " and the query oracle for splitting would happen when it is uneven." + + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); + + @Experimental + public static final ConfigOption<Double> CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND = + ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound") + .doubleType() + .defaultValue(0.05d) + .withFallbackKeys("split-key.even-distribution.factor.lower-bound") + .withDescription( + "The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the" + + " table is evenly distribution or not." + + " The table chunks would use evenly calculation optimization when the data distribution is even," + + " and the query oracle for splitting would happen when it is uneven." + + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); + + @Experimental + public static final ConfigOption<Boolean> SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED = + ConfigOptions.key("scan.incremental.close-idle-reader.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to close idle readers at the end of the snapshot phase. This feature depends on " + + "FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be " + + "greater than or equal to 1.14 when enabling this feature."); + + @Experimental + public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED = + ConfigOptions.key("schema-change.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether send schema change events, by default is true. If set to false, the schema changes will not be sent."); + + @Experimental + public static final ConfigOption<String> DATABASE_CONNECTION_ADAPTER = + ConfigOptions.key("debezium.database.connection.adapter") + .stringType() + .defaultValue("logminer") + .withDescription("Database connection adapter."); + + @Experimental + public static final ConfigOption<String> LOG_MINING_STRATEGY = + ConfigOptions.key("debezium.log.mining.strategy") + .stringType() + .defaultValue("online_catalog") + .withDescription("A strategy in log data analysis or mining."); Review Comment: Should introduce all available values. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSourceOptions.java: ########## @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.source; + +import org.apache.flink.cdc.common.annotation.Experimental; +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.ConfigOptions; +import org.apache.flink.cdc.common.configuration.Configuration; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +/** Configurations for {@link OracleDataSource}. */ +@PublicEvolving +public class OracleDataSourceOptions { + + public static final ConfigOption<String> JDBC_URL = + ConfigOptions.key("jdbc.url") + .stringType() + .noDefaultValue() + .withDescription( + "The url for oracle jdbc ,the url will be used preferentially,if no url is configured, then use \"jdbc:oracle:thin:@localhost:1521:orcl\",but oracle 19c url is \"jdbc:oracle:thin:@//localhost:1521/pdb1\",so the url property is option to adapt to different versions of Oracle"); + + public static final ConfigOption<String> HOSTNAME = + ConfigOptions.key("hostname") + .stringType() + .noDefaultValue() + .withDescription("IP address or hostname of the oracle database server."); + + public static final ConfigOption<Integer> PORT = + ConfigOptions.key("port") + .intType() + .defaultValue(3306) + .withDescription("Integer port number of the oracle database server."); + + public static final ConfigOption<String> USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription( + "Name of the oracle database to use when connecting to the oracle database server."); + + public static final ConfigOption<String> PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription( + "Password to use when connecting to the oracle database server."); + + public static final ConfigOption<String> DATABASE = + ConfigOptions.key("database") + .stringType() + .noDefaultValue() + .withDescription( + "Password to use when connecting to the oracle database server."); + public static final ConfigOption<String> TABLES = + ConfigOptions.key("tables") + .stringType() + .noDefaultValue() + .withDescription( + "Table names of the oracle tables to monitor. Regular expressions are supported. " + + "It is important to note that the dot (.) is treated as a delimiter for database and table names. " + + "If there is a need to use a dot (.) in a regular expression to match any character, " + + "it is necessary to escape the dot with a backslash." + + "eg. test.user_table_[0-9]+, test[0-9].[app|web]_order_\\.*"); + + public static final ConfigOption<String> SERVER_TIME_ZONE = + ConfigOptions.key("server-time-zone") + .stringType() + .noDefaultValue() + .withDescription( + "The session time zone in database server. If not set, then " + + "ZoneId.systemDefault() is used to determine the server time zone."); + + public static final ConfigOption<Integer> SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE = + ConfigOptions.key("scan.incremental.snapshot.chunk.size") + .intType() + .defaultValue(8096) + .withDescription( + "The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table."); + + public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE = + ConfigOptions.key("scan.snapshot.fetch.size") + .intType() + .defaultValue(1024) + .withDescription( + "The maximum fetch size for per poll when read table snapshot."); + + public static final ConfigOption<Duration> CONNECT_TIMEOUT = + ConfigOptions.key("connect.timeout") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "The maximum time that the connector should wait after trying to connect to the oracle database server before timing out."); + + public static final ConfigOption<Integer> CONNECTION_POOL_SIZE = + ConfigOptions.key("connection.pool.size") + .intType() + .defaultValue(20) + .withDescription("The connection pool size."); + + public static final ConfigOption<Integer> CONNECT_MAX_RETRIES = + ConfigOptions.key("connect.max-retries") + .intType() + .defaultValue(3) + .withDescription( + "The max retry times that the connector should retry to build oracle database server connection."); + + public static final ConfigOption<String> SCAN_STARTUP_MODE = + ConfigOptions.key("scan.startup.mode") + .stringType() + .defaultValue("initial") + .withDescription( + "Optional startup mode for oracle CDC consumer, valid enumerations are " + + "\"initial\", \"latest-offset\", \"snapshot\""); + + public static final ConfigOption<Duration> HEARTBEAT_INTERVAL = + ConfigOptions.key("heartbeat.interval") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "Optional interval of sending heartbeat event for tracing the latest available binlog offsets"); + + @Experimental + public static final ConfigOption<Integer> CHUNK_META_GROUP_SIZE = + ConfigOptions.key("chunk-meta.group.size") + .intType() + .defaultValue(1000) + .withDescription( + "The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups."); + + @Experimental + public static final ConfigOption<Double> CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND = + ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound") + .doubleType() + .defaultValue(1000.0d) + .withFallbackKeys("split-key.even-distribution.factor.upper-bound") + .withDescription( + "The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the" + + " table is evenly distribution or not." + + " The table chunks would use evenly calculation optimization when the data distribution is even," + + " and the query oracle for splitting would happen when it is uneven." + + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); + + @Experimental + public static final ConfigOption<Double> CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND = + ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound") + .doubleType() + .defaultValue(0.05d) + .withFallbackKeys("split-key.even-distribution.factor.lower-bound") + .withDescription( + "The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the" + + " table is evenly distribution or not." + + " The table chunks would use evenly calculation optimization when the data distribution is even," + + " and the query oracle for splitting would happen when it is uneven." + + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); + + @Experimental + public static final ConfigOption<Boolean> SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED = + ConfigOptions.key("scan.incremental.close-idle-reader.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to close idle readers at the end of the snapshot phase. This feature depends on " + + "FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be " + + "greater than or equal to 1.14 when enabling this feature."); + + @Experimental + public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED = + ConfigOptions.key("schema-change.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether send schema change events, by default is true. If set to false, the schema changes will not be sent."); + + @Experimental + public static final ConfigOption<String> DATABASE_CONNECTION_ADAPTER = + ConfigOptions.key("debezium.database.connection.adapter") + .stringType() + .defaultValue("logminer") + .withDescription("Database connection adapter."); Review Comment: Should introduce all available values. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/resources/log4j2-test.properties: ########## @@ -0,0 +1,26 @@ +################################################################################ +# Copyright 2023 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=INFO Review Comment: It's better to set this to `ERROR` to reducer log in CI machine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
