This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 60344ed4e [FLINK-39071] Select fields rather * to read snapshot data 
in case schema changes. (#4273)
60344ed4e is described below

commit 60344ed4e62d6e6fed7f9d56b44baf2a9597622e
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Feb 27 19:44:01 2026 +0800

    [FLINK-39071] Select fields rather * to read snapshot data in case schema 
changes. (#4273)
---
 .../source/fetch/PostgresScanFetchTask.java        |  5 +++
 .../postgres/source/utils/PostgresQueryUtils.java  | 49 +++++++---------------
 2 files changed, 20 insertions(+), 34 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
index f31db75bd..1915aca80 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
@@ -287,12 +287,17 @@ public class PostgresScanFetchTask extends 
AbstractScanFetchTask {
                             .filter(field -> 
table.columnWithName(field).typeName().equals("uuid"))
                             .collect(Collectors.toList());
 
+            List<String> columnNames =
+                    table.columns().stream()
+                            .map(column -> 
jdbcConnection.quotedColumnIdString(column.name()))
+                            .collect(Collectors.toList());
             final String selectSql =
                     PostgresQueryUtils.buildSplitScanQuery(
                             snapshotSplit.getTableId(),
                             snapshotSplit.getSplitKeyType(),
                             snapshotSplit.getSplitStart() == null,
                             snapshotSplit.getSplitEnd() == null,
+                            columnNames,
                             uuidFields);
             LOG.debug(
                     "For split '{}' of table {} using select statement: '{}'",
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
index 9c0d55847..348344bab 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
@@ -31,7 +31,6 @@ import java.sql.SQLException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
 
@@ -156,17 +155,16 @@ public class PostgresQueryUtils {
             boolean isFirstSplit,
             boolean isLastSplit,
             List<String> uuidFields) {
-        return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, 
uuidFields, -1, true);
+        return buildSplitScanQuery(tableId, pkRowType, isFirstSplit, 
isLastSplit, null, uuidFields);
     }
 
-    private static String buildSplitQuery(
+    public static String buildSplitScanQuery(
             TableId tableId,
             RowType pkRowType,
             boolean isFirstSplit,
             boolean isLastSplit,
-            List<String> uuidFields,
-            int limitSize,
-            boolean isScanningData) {
+            List<String> columnNames,
+            List<String> uuidFields) {
         final String condition;
 
         if (isFirstSplit && isLastSplit) {
@@ -174,11 +172,9 @@ public class PostgresQueryUtils {
         } else if (isFirstSplit) {
             final StringBuilder sql = new StringBuilder();
             addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ", 
uuidFields);
-            if (isScanningData) {
-                sql.append(" AND NOT (");
-                addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", 
uuidFields);
-                sql.append(")");
-            }
+            sql.append(" AND NOT (");
+            addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", uuidFields);
+            sql.append(")");
             condition = sql.toString();
         } else if (isLastSplit) {
             final StringBuilder sql = new StringBuilder();
@@ -187,30 +183,19 @@ public class PostgresQueryUtils {
         } else {
             final StringBuilder sql = new StringBuilder();
             addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ", 
uuidFields);
-            if (isScanningData) {
-                sql.append(" AND NOT (");
-                addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", 
uuidFields);
-                sql.append(")");
-            }
+            sql.append(" AND NOT (");
+            addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", uuidFields);
+            sql.append(")");
             sql.append(" AND ");
             addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ", 
uuidFields);
             condition = sql.toString();
         }
 
-        if (isScanningData) {
-            return buildSelectWithRowLimits(
-                    tableId, limitSize, "*", Optional.ofNullable(condition), 
Optional.empty());
-        } else {
-            final String orderBy =
-                    
pkRowType.getFieldNames().stream().collect(Collectors.joining(", "));
-            return buildSelectWithBoundaryRowLimits(
-                    tableId,
-                    limitSize,
-                    getPrimaryKeyColumnsProjection(pkRowType),
-                    getMaxPrimaryKeyColumnsProjection(pkRowType),
-                    Optional.ofNullable(condition),
-                    orderBy);
-        }
+        return buildSelectWithRowLimits(
+                tableId,
+                columnNames == null ? "*" : String.join(",", columnNames),
+                Optional.ofNullable(condition),
+                Optional.empty());
     }
 
     public static PreparedStatement readTableSplitDataStatement(
@@ -330,7 +315,6 @@ public class PostgresQueryUtils {
 
     private static String buildSelectWithRowLimits(
             TableId tableId,
-            int limit,
             String projection,
             Optional<String> condition,
             Optional<String> orderBy) {
@@ -343,9 +327,6 @@ public class PostgresQueryUtils {
         if (orderBy.isPresent()) {
             sql.append(" ORDER BY ").append(orderBy.get());
         }
-        if (limit > 0) {
-            sql.append(" LIMIT ").append(limit);
-        }
         return sql.toString();
     }
 

Reply via email to