This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 60344ed4e [FLINK-39071] Select fields rather * to read snapshot data
in case schema changes. (#4273)
60344ed4e is described below
commit 60344ed4e62d6e6fed7f9d56b44baf2a9597622e
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Feb 27 19:44:01 2026 +0800
[FLINK-39071] Select fields rather * to read snapshot data in case schema
changes. (#4273)
---
.../source/fetch/PostgresScanFetchTask.java | 5 +++
.../postgres/source/utils/PostgresQueryUtils.java | 49 +++++++---------------
2 files changed, 20 insertions(+), 34 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
index f31db75bd..1915aca80 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
@@ -287,12 +287,17 @@ public class PostgresScanFetchTask extends
AbstractScanFetchTask {
.filter(field ->
table.columnWithName(field).typeName().equals("uuid"))
.collect(Collectors.toList());
+ List<String> columnNames =
+ table.columns().stream()
+ .map(column ->
jdbcConnection.quotedColumnIdString(column.name()))
+ .collect(Collectors.toList());
final String selectSql =
PostgresQueryUtils.buildSplitScanQuery(
snapshotSplit.getTableId(),
snapshotSplit.getSplitKeyType(),
snapshotSplit.getSplitStart() == null,
snapshotSplit.getSplitEnd() == null,
+ columnNames,
uuidFields);
LOG.debug(
"For split '{}' of table {} using select statement: '{}'",
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
index 9c0d55847..348344bab 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
@@ -31,7 +31,6 @@ import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
-import java.util.stream.Collectors;
import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
@@ -156,17 +155,16 @@ public class PostgresQueryUtils {
boolean isFirstSplit,
boolean isLastSplit,
List<String> uuidFields) {
- return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit,
uuidFields, -1, true);
+ return buildSplitScanQuery(tableId, pkRowType, isFirstSplit,
isLastSplit, null, uuidFields);
}
- private static String buildSplitQuery(
+ public static String buildSplitScanQuery(
TableId tableId,
RowType pkRowType,
boolean isFirstSplit,
boolean isLastSplit,
- List<String> uuidFields,
- int limitSize,
- boolean isScanningData) {
+ List<String> columnNames,
+ List<String> uuidFields) {
final String condition;
if (isFirstSplit && isLastSplit) {
@@ -174,11 +172,9 @@ public class PostgresQueryUtils {
} else if (isFirstSplit) {
final StringBuilder sql = new StringBuilder();
addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ",
uuidFields);
- if (isScanningData) {
- sql.append(" AND NOT (");
- addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ",
uuidFields);
- sql.append(")");
- }
+ sql.append(" AND NOT (");
+ addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", uuidFields);
+ sql.append(")");
condition = sql.toString();
} else if (isLastSplit) {
final StringBuilder sql = new StringBuilder();
@@ -187,30 +183,19 @@ public class PostgresQueryUtils {
} else {
final StringBuilder sql = new StringBuilder();
addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ",
uuidFields);
- if (isScanningData) {
- sql.append(" AND NOT (");
- addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ",
uuidFields);
- sql.append(")");
- }
+ sql.append(" AND NOT (");
+ addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", uuidFields);
+ sql.append(")");
sql.append(" AND ");
addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ",
uuidFields);
condition = sql.toString();
}
- if (isScanningData) {
- return buildSelectWithRowLimits(
- tableId, limitSize, "*", Optional.ofNullable(condition),
Optional.empty());
- } else {
- final String orderBy =
-
pkRowType.getFieldNames().stream().collect(Collectors.joining(", "));
- return buildSelectWithBoundaryRowLimits(
- tableId,
- limitSize,
- getPrimaryKeyColumnsProjection(pkRowType),
- getMaxPrimaryKeyColumnsProjection(pkRowType),
- Optional.ofNullable(condition),
- orderBy);
- }
+ return buildSelectWithRowLimits(
+ tableId,
+ columnNames == null ? "*" : String.join(",", columnNames),
+ Optional.ofNullable(condition),
+ Optional.empty());
}
public static PreparedStatement readTableSplitDataStatement(
@@ -330,7 +315,6 @@ public class PostgresQueryUtils {
private static String buildSelectWithRowLimits(
TableId tableId,
- int limit,
String projection,
Optional<String> condition,
Optional<String> orderBy) {
@@ -343,9 +327,6 @@ public class PostgresQueryUtils {
if (orderBy.isPresent()) {
sql.append(" ORDER BY ").append(orderBy.get());
}
- if (limit > 0) {
- sql.append(" LIMIT ").append(limit);
- }
return sql.toString();
}