lvyanquan commented on code in PR #3968:
URL: https://github.com/apache/flink-cdc/pull/3968#discussion_r2148916956


##########
docs/content.zh/docs/connectors/pipeline-connectors/postgres.md:
##########
@@ -0,0 +1,393 @@
+---
+title: "Postgres"
+weight: 2
+type: docs
+aliases:
+- /connectors/pipeline-connectors/Postgres
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Postgres Connector
+
+Postgres CDC Pipeline 连接器允许从 Postgres 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 
本文描述了如何设置 Postgres CDC Pipeline 连接器。
+
+## 示例
+
+从 Postgres 读取数据同步到 Doris 的 Pipeline 可以定义如下:
+
+```yaml
+source:
+   type: posgtres
+   name: Postgres Source
+   hostname: 127.0.0.1
+   port: 3306
+   username: admin
+   password: pass
+   tables: adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, 
[app|web].schema_\.*.order_\.*
+   decoding.plugin.name:  pgoutput
+   slot.name: pgtest
+
+sink:
+  type: doris
+  name: Doris Sink
+  fenodes: 127.0.0.1:8030
+  username: root
+  password: pass
+
+pipeline:
+   name: Postgres to Doris Pipeline
+   parallelism: 4
+```
+
+## 连接器配置项
+
+<div class="highlight">
+<table class="colwidths-auto docutils">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 10%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 65%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>hostname</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td> Postgres 数据库服务器的 IP 地址或主机名。</td>
+    </tr>
+    <tr>
+      <td>port</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">3306</td>
+      <td>Integer</td>
+      <td>Postgres 数据库服务器的整数端口号。</td>
+    </tr>
+    <tr>
+      <td>username</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>连接到 Postgres 数据库服务器时要使用的 Postgres 用户的名称。</td>
+    </tr>
+    <tr>
+      <td>password</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>连接 Postgres 数据库服务器时使用的密码。</td>
+    </tr>
+    <tr>
+      <td>tables</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>需要监视的 Postgres 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。<br>
+          需要注意的是,点号(.)被视为数据库和表名的分隔符。 
如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。<br>
+          例如,adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, 
[app|web].schema_\.*.order_\.*</td>
+    </tr>
+    <tr>
+      <td>slot.name</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>为从特定插件以流式传输方式获取某个数据库/模式的变更数据,所创建的 Postgre 逻辑解码槽(logical decoding 
slot)的名称。服务器使用这个槽(slot)将事件流式传输给你要配置的连接器(connector)。
+          <br/>复制槽名称必须符合 <a 
href="https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION";>PostgreSQL
 复制插槽的命名规则</a>, 其规则如下: "Each replication slot has a name, which can contain 
lower-case letters, numbers, and the underscore character."</td>
+    </tr> 
+    <tr>
+      <td>tables.exclude</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>需要排除的 Postgres 
数据库的表名,参数会在tables参数后发生排除作用。表名支持正则表达式,以排除满足正则表达式的多个表。<br>
+          用法和tables参数相同</td>
+    </tr>
+     <tr>
+      <td>server-time-zone</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>数据库服务器中的会话时区, 例如: "Asia/Shanghai". 
+          它控制 Postgres 中的时间戳类型如何转换为字符串。
+          更多请参考 <a 
href="https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-data-types";>
 这里</a>.
+          如果没有设置,则使用ZoneId.systemDefault()来确定服务器时区。
+      </td>
+    </tr>
+    <tr>
+      <td>scan.incremental.snapshot.chunk.size</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">8096</td>
+      <td>Integer</td>
+      <td>表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。</td>
+    </tr>
+    <tr>
+      <td>scan.snapshot.fetch.size</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">1024</td>
+      <td>Integer</td>
+      <td>读取表快照时每次读取数据的最大条数。</td>
+    </tr>
+    <tr>
+      <td>scan.startup.mode</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">initial</td>
+      <td>String</td>
+      <td> Postgres CDC 消费者可选的启动模式,
+         合法的模式为 "initial","latest-offset","committed-offset"和 ""snapshot"。</td>
+    </tr>
+    <tr>
+      <td>scan.incremental.snapshot.backfill.skip</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>
+        是否在快照读取阶段跳过 backfill 。<br>
+        如果跳过 backfill ,快照阶段捕获表的更改将在稍后的 binlog 读取阶段被回放,而不是合并到快照中。<br>
+        警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些 binlog 事件可能会被重放(仅保证 
at-least-once )。
+        例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的 binlog 事件应进行特殊处理。

Review Comment:
   wal log.



##########
docs/content/docs/connectors/pipeline-connectors/postgres.md:
##########
@@ -0,0 +1,389 @@
+---
+title: "Postgres"
+weight: 2
+type: docs
+aliases:
+- /connectors/pipeline-connectors/Postgres
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Postgres Connector
+
+Postgres connector allows reading snapshot data and incremental data from 
Postgres database and provides end-to-end full-database data synchronization 
capabilities.
+This document describes how to setup the Postgres connector.
+
+
+## Example
+
+An example of the pipeline for reading data from Postgres and sink to Doris 
can be defined as follows:
+
+```yaml
+source:
+   type: posgtres
+   name: Postgres Source
+   hostname: 127.0.0.1
+   port: 3306
+   username: admin
+   password: pass
+   tables: adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, 
[app|web].schema_\.*.order_\.*
+   decoding.plugin.name:  pgoutput
+   slot.name: pgtest
+
+sink:
+  type: doris
+  name: Doris Sink
+  fenodes: 127.0.0.1:8030
+  username: root
+  password: pass
+
+pipeline:
+   name: Postgres to Doris Pipeline
+   parallelism: 4
+```
+
+## Connector Options
+
+<div class="highlight">
+<table class="colwidths-auto docutils">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 10%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 65%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>hostname</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>IP address or hostname of the Postgres database server.</td>
+    </tr>
+    <tr>
+      <td>port</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">5432</td>
+      <td>Integer</td>
+      <td>Integer port number of the Postgres database server.</td>
+    </tr>
+    <tr>
+      <td>username</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Name of the Postgres database to use when connecting to the Postgres 
database server.</td>
+    </tr>
+    <tr>
+      <td>password</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Password to use when connecting to the Postgres database server.</td>
+    </tr>
+    <tr>
+      <td>tables</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Table name of the Postgres database to monitor. The table-name also 
supports regular expressions to monitor multiple tables that satisfy the 
regular expressions. <br>
+          It is important to note that the dot (.) is treated as a delimiter 
for database and table names. 
+          If there is a need to use a dot (.) in a regular expression to match 
any character, it is necessary to escape the dot with a backslash.<br>
+          例如,adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, 
[app|web].schema_\.*.order_\.*</td>

Review Comment:
   Use English.



##########
docs/content.zh/docs/connectors/pipeline-connectors/postgres.md:
##########
@@ -0,0 +1,393 @@
+---
+title: "Postgres"
+weight: 2
+type: docs
+aliases:
+- /connectors/pipeline-connectors/Postgres
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Postgres Connector
+
+Postgres CDC Pipeline 连接器允许从 Postgres 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 
本文描述了如何设置 Postgres CDC Pipeline 连接器。
+
+## 示例
+
+从 Postgres 读取数据同步到 Doris 的 Pipeline 可以定义如下:
+
+```yaml
+source:
+   type: posgtres
+   name: Postgres Source
+   hostname: 127.0.0.1
+   port: 3306
+   username: admin
+   password: pass
+   tables: adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, 
[app|web].schema_\.*.order_\.*
+   decoding.plugin.name:  pgoutput
+   slot.name: pgtest
+
+sink:
+  type: doris
+  name: Doris Sink
+  fenodes: 127.0.0.1:8030
+  username: root
+  password: pass
+
+pipeline:
+   name: Postgres to Doris Pipeline
+   parallelism: 4
+```
+
+## 连接器配置项
+
+<div class="highlight">
+<table class="colwidths-auto docutils">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 10%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 65%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>hostname</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td> Postgres 数据库服务器的 IP 地址或主机名。</td>
+    </tr>
+    <tr>
+      <td>port</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">3306</td>

Review Comment:
   Should be 5432.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.utils;
+
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
+
+import io.debezium.connector.postgresql.PostgresPartition;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Utilities for converting from debezium {@link Table} types to {@link 
Schema}. */
+public class PostgresSchemaUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresSchemaUtils.class);
+
+    private static volatile PostgresDialect postgresDialect;
+
+    public static List<String> listSchemas(PostgresSourceConfig sourceConfig, 
String namespace) {
+        try (JdbcConnection jdbc = 
getPostgresDialect(sourceConfig).openJdbcConnection()) {
+            return listSchemas(jdbc, namespace);
+        } catch (SQLException e) {
+            throw new RuntimeException("Error to list schemas: " + 
e.getMessage(), e);
+        }
+    }
+
+    public static List<String> listNamespaces(PostgresSourceConfig 
sourceConfig) {
+        try (JdbcConnection jdbc = 
getPostgresDialect(sourceConfig).openJdbcConnection()) {
+            return listNamespaces(jdbc);
+        } catch (SQLException e) {
+            throw new RuntimeException("Error to list namespaces: " + 
e.getMessage(), e);
+        }
+    }
+
+    public static List<TableId> listTables(
+            PostgresSourceConfig sourceConfig, @Nullable String dbName) {
+        try (PostgresConnection jdbc = 
getPostgresDialect(sourceConfig).openJdbcConnection()) {
+
+            List<String> databases =
+                    dbName != null
+                            ? Collections.singletonList(dbName)
+                            : 
Collections.singletonList(sourceConfig.getDatabaseList().get(0));
+
+            List<TableId> tableIds = new ArrayList<>();
+            for (String database : databases) {
+                List<TableId> tableIdList =
+                        jdbc.getAllTableIds(database).stream()
+                                .map(PostgresSchemaUtils::toCdcTableId)
+                                .collect(Collectors.toList());
+                tableIds.addAll(tableIdList);
+            }
+            return tableIds;
+        } catch (SQLException e) {
+            throw new RuntimeException("Error to list databases: " + 
e.getMessage(), e);
+        }
+    }
+
+    public static Schema getTableSchema(
+            PostgresSourceConfig sourceConfig, PostgresPartition partition, 
TableId tableId) {
+        try (PostgresConnection jdbc = 
getPostgresDialect(sourceConfig).openJdbcConnection()) {
+            return getTableSchema(partition, tableId, sourceConfig, jdbc);
+        }
+    }
+
+    public static PostgresDialect getPostgresDialect(PostgresSourceConfig 
sourceConfig) {
+        if (postgresDialect == null) { //

Review Comment:
   // should be removed.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java:
##########
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.factory;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.factories.DataSourceFactory;
+import org.apache.flink.cdc.common.factories.Factory;
+import org.apache.flink.cdc.common.factories.FactoryHelper;
+import org.apache.flink.cdc.common.schema.Selectors;
+import org.apache.flink.cdc.common.source.DataSource;
+import org.apache.flink.cdc.connectors.base.options.SourceOptions;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDataSource;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.CHUNK_META_GROUP_SIZE;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.CONNECTION_POOL_SIZE;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.CONNECT_MAX_RETRIES;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.CONNECT_TIMEOUT;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.DECODING_PLUGIN_NAME;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.HEARTBEAT_INTERVAL;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.HOSTNAME;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.PASSWORD;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.PG_PORT;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_MODE;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SERVER_TIME_ZONE;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SLOT_NAME;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES_EXCLUDE;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.USERNAME;
+import static 
org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
+import static 
org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
+import static 
org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** A {@link Factory} to create {@link PostgresDataSource}. */
+@Internal
+public class PostgresDataSourceFactory implements DataSourceFactory {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresDataSourceFactory.class);
+
+    public static final String IDENTIFIER = "postgres";
+
+    @Override
+    public DataSource createDataSource(Context context) {
+        FactoryHelper.createFactoryHelper(this, context)
+                .validateExcept(PROPERTIES_PREFIX, DEBEZIUM_OPTIONS_PREFIX);
+
+        final Configuration config = context.getFactoryConfiguration();
+        String hostname = config.get(HOSTNAME);
+        int port = config.get(PG_PORT);
+        String pluginName = config.get(DECODING_PLUGIN_NAME);
+        String slotName = config.get(SLOT_NAME);
+        String username = config.get(USERNAME);
+        String password = config.get(PASSWORD);
+        String chunkKeyColumn = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
+        String tables = config.get(TABLES);
+        ZoneId serverTimeZone = getServerTimeZone(config);
+        String tablesExclude = config.get(TABLES_EXCLUDE);
+        Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
+        StartupOptions startupOptions = getStartupOptions(config);
+
+        int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
+        int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
+        int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
+
+        double distributionFactorUpper = 
config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+        double distributionFactorLower = 
config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+
+        boolean closeIdleReaders = 
config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
+
+        Duration connectTimeout = config.get(CONNECT_TIMEOUT);
+        int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
+        int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
+        boolean skipSnapshotBackfill = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
+        int lsnCommitCheckpointsDelay = 
config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
+
+        validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 
1);
+        validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
+        validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
+        validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1);
+        validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0);
+        validateDistributionFactorUpper(distributionFactorUpper);
+        validateDistributionFactorLower(distributionFactorLower);
+
+        Map<String, String> configMap = config.toMap();
+        Optional<String> databaseName = getValidateDatabaseName(tables);
+
+        PostgresSourceConfigFactory configFactory =
+                
PostgresSourceBuilder.PostgresIncrementalSource.<RowData>builder()
+                        .hostname(hostname)
+                        .port(port)
+                        .database(databaseName.get())
+                        .schemaList(".*")
+                        .tableList(".*")
+                        .username(username)
+                        .password(password)
+                        .decodingPluginName(pluginName)
+                        .slotName(slotName)
+                        .serverTimeZone(serverTimeZone.getId())
+                        .debeziumProperties(getDebeziumProperties(configMap))
+                        .splitSize(splitSize)
+                        .splitMetaGroupSize(splitMetaGroupSize)
+                        .distributionFactorUpper(distributionFactorUpper)
+                        .distributionFactorLower(distributionFactorLower)
+                        .fetchSize(fetchSize)
+                        .connectTimeout(connectTimeout)
+                        .connectMaxRetries(connectMaxRetries)
+                        .connectionPoolSize(connectionPoolSize)
+                        .startupOptions(startupOptions)
+                        .chunkKeyColumn(chunkKeyColumn)
+                        .heartbeatInterval(heartbeatInterval)
+                        .closeIdleReaders(closeIdleReaders)
+                        .skipSnapshotBackfill(skipSnapshotBackfill)
+                        .lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
+                        .getConfigFactory();
+
+        List<TableId> tableIds = 
PostgresSchemaUtils.listTables(configFactory.create(0), null);
+
+        Selectors selectors = new 
Selectors.SelectorsBuilder().includeTables(tables).build();
+        List<String> capturedTables = getTableList(tableIds, selectors);
+        if (capturedTables.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "Cannot find any table by the option 'tables' = " + 
tables);
+        }
+        if (tablesExclude != null) {
+            Selectors selectExclude =
+                    new 
Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
+            List<String> excludeTables = getTableList(tableIds, selectExclude);
+            if (!excludeTables.isEmpty()) {
+                capturedTables.removeAll(excludeTables);
+            }
+            if (capturedTables.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "Cannot find any table with by the option 
'tables.exclude'  = "
+                                + tablesExclude);
+            }
+        }
+        configFactory.tableList(capturedTables.toArray(new String[0]));
+
+        return new PostgresDataSource(configFactory);
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(HOSTNAME);
+        options.add(USERNAME);
+        options.add(PASSWORD);
+        options.add(TABLES);
+        options.add(SLOT_NAME);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(PG_PORT);
+        options.add(TABLES_EXCLUDE);
+        options.add(DECODING_PLUGIN_NAME);
+        options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
+        options.add(SCAN_SNAPSHOT_FETCH_SIZE);
+        options.add(SCAN_STARTUP_MODE);
+        options.add(SERVER_TIME_ZONE);
+        options.add(CONNECT_TIMEOUT);
+        options.add(CONNECT_MAX_RETRIES);
+        options.add(CONNECTION_POOL_SIZE);
+        options.add(HEARTBEAT_INTERVAL);
+        options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
+        options.add(CHUNK_META_GROUP_SIZE);
+        options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+        options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+        options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
+        return options;
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    private static List<String> getTableList(
+            @Nullable List<TableId> tableIdList, Selectors selectors) {
+        return tableIdList.stream()
+                .filter(selectors::isMatch)
+                .map(TableId::toString)
+                .collect(Collectors.toList());
+    }
+
+    /** Checks the value of given integer option is valid. */
+    private void validateIntegerOption(
+            ConfigOption<Integer> option, int optionValue, int exclusiveMin) {
+        checkState(
+                optionValue > exclusiveMin,
+                String.format(
+                        "The value of option '%s' must larger than %d, but is 
%d",
+                        option.key(), exclusiveMin, optionValue));
+    }
+
+    private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
+
+    private static final String SCAN_STARTUP_MODE_VALUE_SNAPSHOT = "snapshot";
+    private static final String SCAN_STARTUP_MODE_VALUE_LATEST = 
"latest-offset";
+
+    private static StartupOptions getStartupOptions(Configuration config) {
+        String modeString = config.get(SCAN_STARTUP_MODE);
+
+        switch (modeString.toLowerCase()) {
+            case SCAN_STARTUP_MODE_VALUE_INITIAL:
+                return StartupOptions.initial();
+            case SCAN_STARTUP_MODE_VALUE_SNAPSHOT:
+                return StartupOptions.snapshot();
+            case SCAN_STARTUP_MODE_VALUE_LATEST:
+                return StartupOptions.latest();
+
+            default:

Review Comment:
   Miss support for COMMITTED_OFFSETS.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.utils;
+
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
+
+import io.debezium.connector.postgresql.PostgresPartition;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Utilities for converting from debezium {@link Table} types to {@link 
Schema}. */
+public class PostgresSchemaUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresSchemaUtils.class);
+
+    private static volatile PostgresDialect postgresDialect;
+
+    public static List<String> listSchemas(PostgresSourceConfig sourceConfig, 
String namespace) {
+        try (JdbcConnection jdbc = 
getPostgresDialect(sourceConfig).openJdbcConnection()) {
+            return listSchemas(jdbc, namespace);
+        } catch (SQLException e) {
+            throw new RuntimeException("Error to list schemas: " + 
e.getMessage(), e);
+        }
+    }
+
+    public static List<String> listNamespaces(PostgresSourceConfig 
sourceConfig) {
+        try (JdbcConnection jdbc = 
getPostgresDialect(sourceConfig).openJdbcConnection()) {
+            return listNamespaces(jdbc);
+        } catch (SQLException e) {
+            throw new RuntimeException("Error to list namespaces: " + 
e.getMessage(), e);
+        }
+    }
+
+    public static List<TableId> listTables(
+            PostgresSourceConfig sourceConfig, @Nullable String dbName) {
+        try (PostgresConnection jdbc = 
getPostgresDialect(sourceConfig).openJdbcConnection()) {
+
+            List<String> databases =
+                    dbName != null
+                            ? Collections.singletonList(dbName)
+                            : 
Collections.singletonList(sourceConfig.getDatabaseList().get(0));
+
+            List<TableId> tableIds = new ArrayList<>();
+            for (String database : databases) {
+                List<TableId> tableIdList =
+                        jdbc.getAllTableIds(database).stream()
+                                .map(PostgresSchemaUtils::toCdcTableId)
+                                .collect(Collectors.toList());
+                tableIds.addAll(tableIdList);
+            }
+            return tableIds;
+        } catch (SQLException e) {
+            throw new RuntimeException("Error to list databases: " + 
e.getMessage(), e);
+        }
+    }
+
+    public static Schema getTableSchema(
+            PostgresSourceConfig sourceConfig, PostgresPartition partition, 
TableId tableId) {
+        try (PostgresConnection jdbc = 
getPostgresDialect(sourceConfig).openJdbcConnection()) {
+            return getTableSchema(partition, tableId, sourceConfig, jdbc);
+        }
+    }
+
+    public static PostgresDialect getPostgresDialect(PostgresSourceConfig 
sourceConfig) {
+        if (postgresDialect == null) { //
+            synchronized (PostgresSchemaUtils.class) {
+                if (postgresDialect == null) { //

Review Comment:
   // should be removed.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
+import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.debezium.data.Envelope;
+import io.debezium.data.geometry.Geography;
+import io.debezium.data.geometry.Geometry;
+import io.debezium.util.HexConverter;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Event deserializer for {@link PostgresDataSource}. */
+@Internal
+public class PostgresEventDeserializer extends 
DebeziumEventDeserializationSchema {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String SRID = "srid";
+    public static final String HEXEWKB = "hexewkb";
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
+        super(new PostgresSchemaDataTypeInference(), changelogMode);
+    }
+
+    @Override
+    protected List<SchemaChangeEvent> 
deserializeSchemaChangeRecord(SourceRecord record) {
+        return Collections.emptyList();
+    }
+
+    @Override
+    protected boolean isDataChangeRecord(SourceRecord record) {
+        Schema valueSchema = record.valueSchema();
+        Struct value = (Struct) record.value();
+        return value != null
+                && valueSchema != null
+                && valueSchema.field(Envelope.FieldName.OPERATION) != null
+                && value.getString(Envelope.FieldName.OPERATION) != null;
+    }
+
+    @Override
+    protected boolean isSchemaChangeRecord(SourceRecord record) {
+        return false;
+    }
+
+    @Override
+    protected TableId getTableId(SourceRecord record) {
+        String[] parts = record.topic().split("\\.");
+        return TableId.tableId(parts[1], parts[2]);
+    }
+
+    @Override
+    protected Map<String, String> getMetadata(SourceRecord record) {
+        return Collections.emptyMap();

Review Comment:
   We can support reading `op_ts` metadata like what MySqlEventDeserializer 
does.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference;
+
+import io.debezium.data.geometry.Geography;
+import io.debezium.data.geometry.Geometry;
+import org.apache.kafka.connect.data.Schema;
+
+/** {@link DataType} inference for MySQL debezium {@link Schema}. */
+@Internal
+public class PostgresSchemaDataTypeInference extends 
DebeziumSchemaDataTypeInference {
+
+    private static final long serialVersionUID = 1L;
+
+    protected DataType inferStruct(Object value, Schema schema) {
+        // the Geometry datatype in MySQL will be converted to

Review Comment:
   MySQL => PostgreSQL.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference;
+
+import io.debezium.data.geometry.Geography;
+import io.debezium.data.geometry.Geometry;
+import org.apache.kafka.connect.data.Schema;
+
+/** {@link DataType} inference for MySQL debezium {@link Schema}. */

Review Comment:
   MySQL debezium => PostgreSQL debezium.



##########
docs/content/docs/connectors/pipeline-connectors/postgres.md:
##########
@@ -0,0 +1,389 @@
+---
+title: "Postgres"
+weight: 2
+type: docs
+aliases:
+- /connectors/pipeline-connectors/Postgres
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Postgres Connector
+
+Postgres connector allows reading snapshot data and incremental data from 
Postgres database and provides end-to-end full-database data synchronization 
capabilities.
+This document describes how to setup the Postgres connector.

Review Comment:
   Please add a section to illustrate the known limitation that Schema 
Evolution was not support in Postgres Connector now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to