loserwang1024 commented on code in PR #3968:
URL: https://github.com/apache/flink-cdc/pull/3968#discussion_r2154088782


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+
+import java.time.Duration;
+
+/** Configurations for {@link PostgresDataSource}. */
+@PublicEvolving
+public class PostgresDataSourceOptions {
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the PostgreSQL 
database server.");
+    public static final ConfigOption<Integer> PG_PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(5432)
+                    .withDescription("Integer port number of the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the PostgreSQL database to use when 
connecting to the PostgreSQL database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the PostgreSQL tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> DECODING_PLUGIN_NAME =
+            ConfigOptions.key("decoding.plugin.name")

Review Comment:
   remove json for document. even the latest debezium document not support it.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.utils;
+
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
+import org.apache.flink.table.types.logical.DecimalType;
+
+import io.debezium.relational.Column;
+
+/** A utility class for converting Postgres types to Flink types. */
+public class PostgresTypeUtils {
+    private static final String PG_SMALLSERIAL = "smallserial";
+    private static final String PG_SERIAL = "serial";
+    private static final String PG_BIGSERIAL = "bigserial";
+    private static final String PG_BYTEA = "bytea";
+    private static final String PG_BYTEA_ARRAY = "_bytea";
+    private static final String PG_SMALLINT = "int2";
+    private static final String PG_SMALLINT_ARRAY = "_int2";
+    private static final String PG_INTEGER = "int4";
+    private static final String PG_INTEGER_ARRAY = "_int4";
+    private static final String PG_BIGINT = "int8";
+    private static final String PG_BIGINT_ARRAY = "_int8";
+    private static final String PG_REAL = "float4";
+    private static final String PG_REAL_ARRAY = "_float4";
+    private static final String PG_DOUBLE_PRECISION = "float8";
+    private static final String PG_DOUBLE_PRECISION_ARRAY = "_float8";
+    private static final String PG_NUMERIC = "numeric";
+    private static final String PG_NUMERIC_ARRAY = "_numeric";
+    private static final String PG_BOOLEAN = "bool";
+    private static final String PG_BOOLEAN_ARRAY = "_bool";
+    private static final String PG_TIMESTAMP = "timestamp";
+    private static final String PG_TIMESTAMP_ARRAY = "_timestamp";
+    private static final String PG_TIMESTAMPTZ = "timestamptz";
+    private static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz";
+    private static final String PG_DATE = "date";
+    private static final String PG_DATE_ARRAY = "_date";
+    private static final String PG_TIME = "time";
+    private static final String PG_TIME_ARRAY = "_time";
+    private static final String PG_TEXT = "text";
+    private static final String PG_TEXT_ARRAY = "_text";
+    private static final String PG_CHAR = "bpchar";
+    private static final String PG_CHAR_ARRAY = "_bpchar";
+    private static final String PG_CHARACTER = "character";
+    private static final String PG_CHARACTER_ARRAY = "_character";
+    private static final String PG_CHARACTER_VARYING = "varchar";
+    private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
+    private static final String PG_UUID = "uuid";
+    private static final String PG_GEOMETRY = "geometry";
+    private static final String PG_GEOGRAPHY = "geography";
+
+    /** Returns a corresponding Flink data type from a debezium {@link 
Column}. */
+    public static DataType fromDbzColumn(Column column) {
+        DataType dataType = convertFromColumn(column);
+        if (column.isOptional()) {
+            return dataType;
+        } else {
+            return dataType.notNull();
+        }
+    }
+
+    /**
+     * Returns a corresponding Flink data type from a debezium {@link Column} 
with nullable always
+     * be true.
+     */
+    private static DataType convertFromColumn(Column column) {
+        String typeName = column.typeName();
+
+        int precision = column.length();
+        int scale = column.scale().orElse(0);
+
+        switch (typeName) {
+            case PG_BOOLEAN:
+                return DataTypes.BOOLEAN();

Review Comment:
   I am a little bit concerned about it. Because PostgreSQL's WAL log does not 
contain DDL statements that include the PostgreSQL type for this mapping. We 
can only infer the types from Debezium's records. When we later introduce type 
derivation, it may lead to fragmentation.
   Could you please check how Debezium performs this mapping? Perhaps we can 
also use the same mapping approach (this might involve converting fine-grained 
types into coarser types).



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+
+import java.time.Duration;
+
+/** Configurations for {@link PostgresDataSource}. */
+@PublicEvolving
+public class PostgresDataSourceOptions {
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the PostgreSQL 
database server.");
+    public static final ConfigOption<Integer> PG_PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(5432)
+                    .withDescription("Integer port number of the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the PostgreSQL database to use when 
connecting to the PostgreSQL database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the PostgreSQL tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> DECODING_PLUGIN_NAME =
+            ConfigOptions.key("decoding.plugin.name")
+                    .stringType()
+                    .defaultValue("pgoutput")
+                    .withDescription(
+                            "The name of the Postgres logical decoding plug-in 
installed on the server.\n"
+                                    + "Supported values are decoderbufs, 
wal2json, wal2json_rds, wal2json_streaming,\n"
+                                    + "wal2json_rds_streaming and pgoutput.");
+
+    public static final ConfigOption<String> SLOT_NAME =
+            ConfigOptions.key("slot.name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The name of the PostgreSQL logical decoding slot 
that was created for streaming changes "
+                                    + "from a particular plug-in for a 
particular database/schema. The server uses this slot "
+                                    + "to stream events to the connector that 
you are configuring.");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The session time zone in database server. If not 
set, then "
+                                    + "ZoneId.systemDefault() is used to 
determine the server time zone.");
+
+    public static final ConfigOption<String> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The chunk key of table snapshot, captured tables 
are split into multiple chunks by a chunk key when read the snapshot of table."
+                                    + "By default, the chunk key is the first 
column of the primary key and the chunk key is the RowId in oracle."
+                                    + "This column must be a column of the 
primary key.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the PostgreSQL database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build PostgreSQL database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for PostgreSQL CDC 
consumer, valid enumerations are "
+                                    + "\"initial\", 
\"latest-offset\",\"snapshot\" or \"committed-offset\"");
+
+    public static final ConfigOption<Duration> HEARTBEAT_INTERVAL =
+            ConfigOptions.key("heartbeat.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "Optional interval of sending heartbeat event for 
tracing the latest available wal offsets");
+
+    public static final ConfigOption<Double> 
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
+            ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound")
+                    .doubleType()
+                    .defaultValue(1000.0d)
+                    
.withFallbackKeys("split-key.even-distribution.factor.upper-bound")
+                    .withDescription(
+                            "The upper bound of chunk key distribution factor. 
The distribution factor is used to determine whether the"
+                                    + " table is evenly distribution or not."
+                                    + " The table chunks would use evenly 
calculation optimization when the data distribution is even,"
+                                    + " and the query for splitting would 
happen when it is uneven."
+                                    + " The distribution factor could be 
calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
+
+    public static final ConfigOption<Double> 
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
+            ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound")
+                    .doubleType()
+                    .defaultValue(0.05d)
+                    
.withFallbackKeys("split-key.even-distribution.factor.lower-bound")
+                    .withDescription(
+                            "The lower bound of chunk key distribution factor. 
The distribution factor is used to determine whether the"
+                                    + " table is evenly distribution or not."
+                                    + " The table chunks would use evenly 
calculation optimization when the data distribution is even,"
+                                    + " and the query for splitting would 
happen when it is uneven."
+                                    + " The distribution factor could be 
calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
+
+    public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP =

Review Comment:
   In yaml pipeline, I think why not set this default as true?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to