Copilot commented on code in PR #4233:
URL: https://github.com/apache/flink-cdc/pull/4233#discussion_r2745334061
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Event deserializer for {@link PostgresDataSource}. */
@Internal
public class PostgresEventDeserializer extends
DebeziumEventDeserializationSchema {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresEventDeserializer.class);
private static final long serialVersionUID = 1L;
private List<PostgreSQLReadableMetadata> readableMetadataList;
private final boolean includeDatabaseInTableId;
private final String databaseName;
+ private Map<TableId, Schema> schemaMap = new HashMap<>();
+ private final PostgresSourceConfig postgresSourceConfig;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps;
public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
- this(changelogMode, new ArrayList<>(), false, null);
+ this(changelogMode, new ArrayList<>(), false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList) {
- this(changelogMode, readableMetadataList, false, null);
+ this(changelogMode, readableMetadataList, false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId) {
- this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null);
+ this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId,
- String databaseName) {
+ String databaseName,
+ PostgresSourceConfig postgresSourceConfig,
+ Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps) {
super(new PostgresSchemaDataTypeInference(), changelogMode);
this.readableMetadataList = readableMetadataList;
this.includeDatabaseInTableId = includeDatabaseInTableId;
this.databaseName = databaseName;
+ this.postgresSourceConfig = postgresSourceConfig;
+ this.beforeTableColumnsOidMaps = beforeTableColumnsOidMaps;
}
@Override
protected List<SchemaChangeEvent>
deserializeSchemaChangeRecord(SourceRecord record) {
return Collections.emptyList();
}
+ @Override
+ public List<? extends Event> deserialize(SourceRecord record) throws
Exception {
+ List<Event> result = new ArrayList<>();
+ if (postgresSourceConfig.isIncludeSchemaChanges()) {
+ handleSchemaChange(record, result);
+ }
+ if (isDataChangeRecord(record)) {
+ LOG.trace("Process data change record: {}", record);
+ result.addAll(deserializeDataChangeRecord(record));
+ } else if (isSchemaChangeRecord(record)) {
+ LOG.trace("Process schema change record: {}", record);
+ } else {
+ LOG.trace("Ignored other record: {}", record);
+ return Collections.emptyList();
+ }
+ return result;
+ }
+
+ private void handleSchemaChange(SourceRecord record, List<Event> result) {
+ TableId tableId = getTableId(record);
+ Schema valueSchema = record.valueSchema();
+ Schema beforeSchema = schemaMap.get(tableId);
+ List<String> beforeColumnNames;
+ List<String> afterColumnNames;
+ Schema afterSchema = fieldSchema(valueSchema,
Envelope.FieldName.AFTER);
+ List<Field> afterFields = afterSchema.fields();
+ org.apache.flink.cdc.common.schema.Schema schema =
+ PostgresSchemaUtils.getTableSchema(postgresSourceConfig,
tableId);
+ List<Column> columns = schema.getColumns();
+ Map<String, Integer> beforeColumnsOidMaps =
+
beforeTableColumnsOidMaps.get(TableId.tableId(tableId.getTableName()));
+ // When the first piece of data arrives, beforeSchema is empty
+ if (beforeSchema != null) {
+ beforeColumnNames =
+ beforeSchema.fields().stream().map(e ->
e.name()).collect(Collectors.toList());
+ afterColumnNames = afterFields.stream().map(e ->
e.name()).collect(Collectors.toList());
+ List<String> newAddColumnNames =
findAddedElements(beforeColumnNames, afterColumnNames);
+ List<String> newDelColumnNames =
+ findRemovedElements(beforeColumnNames, afterColumnNames);
+ Map<String, String> renameColumnMaps = new HashMap<>();
+ // Process the fields of rename
+ if (!newAddColumnNames.isEmpty() && !newDelColumnNames.isEmpty()) {
+ renameColumnMaps =
+ getRenameColumnMaps(
+ tableId,
+ newAddColumnNames,
+ newDelColumnNames,
+ beforeSchema,
+ afterSchema,
+ beforeColumnsOidMaps);
+ if (!renameColumnMaps.isEmpty()) {
+ result.add(new RenameColumnEvent(tableId,
renameColumnMaps));
+ Map<String, String> finalRenameColumnMaps1 =
renameColumnMaps;
+ renameColumnMaps
+ .keySet()
+ .forEach(
+ e -> {
+ beforeColumnsOidMaps.put(
+ finalRenameColumnMaps1.get(e),
+ beforeColumnsOidMaps.get(e));
+ });
+ }
+ newAddColumnNames.removeAll(renameColumnMaps.values());
+ newDelColumnNames.removeAll(renameColumnMaps.keySet());
+ }
+ // Process the fields of add
+ if (!newAddColumnNames.isEmpty()) {
+ newAddColumns(
+ tableId,
+ afterSchema,
+ result,
+ columns,
+ newAddColumnNames,
+ beforeColumnsOidMaps);
+ }
+ // Process the fields of delete
+ if (!newDelColumnNames.isEmpty()) {
+ newDelColumns(
+ tableId,
+ beforeColumnNames,
+ result,
+ newDelColumnNames,
+ beforeColumnsOidMaps);
+ }
+ // Handling fields with changed types
+ findModifiedTypeColumns(
+ tableId,
+ beforeSchema,
+ beforeColumnNames,
+ afterSchema,
+ result,
+ columns,
+ renameColumnMaps);
+ }
+ schemaMap.put(tableId, afterSchema);
+ }
+
+ private void newDelColumns(
+ TableId tableId,
+ List<String> beforeColumnNames,
+ List<Event> result,
+ List<String> newDelColumnNames,
+ Map<String, Integer> beforeColumnsOidMaps) {
+ newDelColumnNames.forEach(
+ e -> {
+ result.add(new DropColumnEvent(tableId,
Collections.singletonList(e)));
+ beforeColumnNames.removeAll(newDelColumnNames);
+ beforeColumnsOidMaps.remove(e);
+ });
+ }
+
+ private void newAddColumns(
+ TableId tableId,
+ Schema afterSchema,
+ List<Event> result,
+ List<Column> columns,
+ List<String> newAddColumnNames,
+ Map<String, Integer> beforeColumnsOidMaps) {
+ List<Column> newAddColumns =
+ columns.stream()
+ .filter(e -> newAddColumnNames.contains(e.getName()))
+ .collect(Collectors.toList());
+ if (newAddColumns.size() != newAddColumnNames.size()) {
+ List<String> notInCurrentTableColumns =
+ newAddColumnNames.stream()
+ .filter(e -> !newAddColumns.contains(e))
+ .collect(Collectors.toList());
+ notInCurrentTableColumns.forEach(
+ e -> {
+ Field field = afterSchema.field(e);
+ DataType dataType =
convertKafkaTypeToFlintType(field.schema());
+ PhysicalColumn column =
+ new PhysicalColumn(e, dataType,
field.schema().doc());
+ result.add(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new
AddColumnEvent.ColumnWithPosition(
+ column,
+
AddColumnEvent.ColumnPosition.LAST,
+ null))));
+ });
+ }
+ newAddColumns.forEach(
+ e -> {
+ SchemaChangeEvent event =
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new
AddColumnEvent.ColumnWithPosition(
+ e,
AddColumnEvent.ColumnPosition.LAST, null)));
+ result.add(event);
+ });
+ Map<String, Integer> newAddColumnsOidMaps =
+ PostgresSchemaUtils.getColumnOids(postgresSourceConfig,
tableId, newAddColumnNames);
+ newAddColumnNames.forEach(
+ e -> {
+ beforeColumnsOidMaps.put(e,
newAddColumnsOidMaps.getOrDefault(e, null));
+ });
+ }
+
+ private static DataType convertKafkaTypeToFlintType(Schema kafkaSchema) {
+ switch (kafkaSchema.type()) {
+ case STRING:
+ return DataTypes.STRING();
+ case INT8:
+ return DataTypes.TINYINT();
+ case INT16:
+ return DataTypes.SMALLINT();
+ case INT32:
+ return DataTypes.INT();
+ case INT64:
+ return DataTypes.BIGINT();
+ case FLOAT32:
+ return DataTypes.FLOAT();
+ case FLOAT64:
+ return DataTypes.DOUBLE();
+ case BOOLEAN:
+ return DataTypes.BOOLEAN();
+ case BYTES:
+ return DataTypes.BYTES();
+ case ARRAY:
+ Schema elementSchema = kafkaSchema.valueSchema();
+ DataType elementType =
convertKafkaTypeToFlintType(elementSchema);
+ return DataTypes.ARRAY(elementType);
+ case STRUCT:
+ Schema schema = kafkaSchema.valueSchema();
+ DataType dataType = convertKafkaTypeToFlintType(schema);
+ return DataTypes.ROW(dataType);
+ case MAP:
+ return DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING());
+ default:
+ return DataTypes.STRING();
+ }
+ }
+
+ private void findModifiedTypeColumns(
+ TableId tableId,
+ Schema oldSchema,
+ List<String> oldColumnNames,
+ Schema afterSchema,
+ List<Event> result,
+ List<Column> columns,
+ Map<String, String> renameColumnMaps) {
+ Map<String, String> finalRenameColumnMaps = renameColumnMaps;
+ oldColumnNames.stream()
+ .forEach(
+ oldFieldName -> {
+ Field oldField = oldSchema.field(oldFieldName);
+ Field afterField = afterSchema.field(oldFieldName);
+ if (afterField == null) {
+ afterField =
+
afterSchema.field(finalRenameColumnMaps.get(oldFieldName));
+ }
+ String afterFieldName = afterField.name();
+ if (!oldField.schema()
+ .type()
+ .getName()
+
.equals(afterField.schema().type().getName())
+ || (oldField.schema().defaultValue() !=
null
+ &&
afterField.schema().defaultValue() != null
+ && !oldField.schema()
+ .defaultValue()
+
.equals(afterField.schema().defaultValue()))
+ || (oldField.schema().parameters() != null
+ &&
afterField.schema().parameters() != null
+ && !oldField.schema()
+ .parameters()
+
.equals(afterField.schema().parameters()))) {
+ Map<String, DataType> typeMapping = new
HashMap<>();
+ Column column =
+ columns.stream()
+ .filter(e ->
e.getName().equals(afterFieldName))
+ .findFirst()
+ .get();
+ typeMapping.put(afterField.name(),
column.getType());
+ result.add(new AlterColumnTypeEvent(tableId,
typeMapping));
+ }
+ });
+ }
+
+ private Map<String, String> getRenameColumnMaps(
+ TableId tableId,
+ List<String> newAddColumnNames,
+ List<String> newDelColumnNames,
+ Schema oldSchema,
+ Schema afterSchema,
+ Map<String, Integer> beforeColumnsOidMaps) {
+ Map<String, String> renameColumnMaps = new HashMap<>();
+ Map<String, Integer> newAddColumnsOidMaps =
+ PostgresSchemaUtils.getColumnOids(postgresSourceConfig,
tableId, newAddColumnNames);
+ Map<String, Integer> newDelColumnsOidMaps = new HashMap<>();
+
+ newDelColumnNames.forEach(
+ e -> {
+ if (beforeColumnsOidMaps.keySet().contains(e)) {
+ newDelColumnsOidMaps.put(e,
beforeColumnsOidMaps.get(e));
+ } else {
+ newDelColumnsOidMaps.put(e, null);
+ }
+ });
+
+ newAddColumnNames.forEach(
+ e -> {
+ if (!newAddColumnsOidMaps.keySet().contains(e)) {
+ newAddColumnsOidMaps.put(e, null);
+ }
+ });
+ for (Map.Entry<String, Integer> newDelEntry :
newDelColumnsOidMaps.entrySet()) {
+ for (Map.Entry<String, Integer> newAddEntry :
newAddColumnsOidMaps.entrySet()) {
+ if ((newDelEntry.getValue() == null && newAddEntry.getValue()
== null)
+ || (newDelEntry.getValue() == null &&
newAddEntry.getValue() != null)) {
+ int oldFieldIndex =
oldSchema.field(newDelEntry.getKey()).index();
+ int afterFieldIndex =
afterSchema.field(newAddEntry.getKey()).index();
+ if (oldFieldIndex == afterFieldIndex) {
+ renameColumnMaps.put(newDelEntry.getKey(),
newAddEntry.getKey());
+ }
Review Comment:
Logic issue in rename detection: The condition at line 385-386 compares null
values but has problematic logic. When newDelEntry.getValue() is null and
newAddEntry.getValue() is not null, it treats this as a potential rename.
However, this is counterintuitive - if the deleted column has no OID (null) and
the added column has an OID, they are likely different columns, not a rename.
The logic should be reconsidered to ensure accurate rename detection.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Event deserializer for {@link PostgresDataSource}. */
@Internal
public class PostgresEventDeserializer extends
DebeziumEventDeserializationSchema {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresEventDeserializer.class);
private static final long serialVersionUID = 1L;
private List<PostgreSQLReadableMetadata> readableMetadataList;
private final boolean includeDatabaseInTableId;
private final String databaseName;
+ private Map<TableId, Schema> schemaMap = new HashMap<>();
+ private final PostgresSourceConfig postgresSourceConfig;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps;
public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
- this(changelogMode, new ArrayList<>(), false, null);
+ this(changelogMode, new ArrayList<>(), false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList) {
- this(changelogMode, readableMetadataList, false, null);
+ this(changelogMode, readableMetadataList, false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId) {
- this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null);
+ this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId,
- String databaseName) {
+ String databaseName,
+ PostgresSourceConfig postgresSourceConfig,
+ Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps) {
super(new PostgresSchemaDataTypeInference(), changelogMode);
this.readableMetadataList = readableMetadataList;
this.includeDatabaseInTableId = includeDatabaseInTableId;
this.databaseName = databaseName;
+ this.postgresSourceConfig = postgresSourceConfig;
+ this.beforeTableColumnsOidMaps = beforeTableColumnsOidMaps;
}
@Override
protected List<SchemaChangeEvent>
deserializeSchemaChangeRecord(SourceRecord record) {
return Collections.emptyList();
}
+ @Override
+ public List<? extends Event> deserialize(SourceRecord record) throws
Exception {
+ List<Event> result = new ArrayList<>();
+ if (postgresSourceConfig.isIncludeSchemaChanges()) {
+ handleSchemaChange(record, result);
+ }
+ if (isDataChangeRecord(record)) {
+ LOG.trace("Process data change record: {}", record);
+ result.addAll(deserializeDataChangeRecord(record));
+ } else if (isSchemaChangeRecord(record)) {
+ LOG.trace("Process schema change record: {}", record);
+ } else {
+ LOG.trace("Ignored other record: {}", record);
+ return Collections.emptyList();
+ }
+ return result;
+ }
+
+ private void handleSchemaChange(SourceRecord record, List<Event> result) {
+ TableId tableId = getTableId(record);
+ Schema valueSchema = record.valueSchema();
+ Schema beforeSchema = schemaMap.get(tableId);
+ List<String> beforeColumnNames;
+ List<String> afterColumnNames;
+ Schema afterSchema = fieldSchema(valueSchema,
Envelope.FieldName.AFTER);
+ List<Field> afterFields = afterSchema.fields();
+ org.apache.flink.cdc.common.schema.Schema schema =
+ PostgresSchemaUtils.getTableSchema(postgresSourceConfig,
tableId);
+ List<Column> columns = schema.getColumns();
+ Map<String, Integer> beforeColumnsOidMaps =
+
beforeTableColumnsOidMaps.get(TableId.tableId(tableId.getTableName()));
+ // When the first piece of data arrives, beforeSchema is empty
+ if (beforeSchema != null) {
+ beforeColumnNames =
+ beforeSchema.fields().stream().map(e ->
e.name()).collect(Collectors.toList());
+ afterColumnNames = afterFields.stream().map(e ->
e.name()).collect(Collectors.toList());
+ List<String> newAddColumnNames =
findAddedElements(beforeColumnNames, afterColumnNames);
+ List<String> newDelColumnNames =
+ findRemovedElements(beforeColumnNames, afterColumnNames);
+ Map<String, String> renameColumnMaps = new HashMap<>();
+ // Process the fields of rename
+ if (!newAddColumnNames.isEmpty() && !newDelColumnNames.isEmpty()) {
+ renameColumnMaps =
+ getRenameColumnMaps(
+ tableId,
+ newAddColumnNames,
+ newDelColumnNames,
+ beforeSchema,
+ afterSchema,
+ beforeColumnsOidMaps);
+ if (!renameColumnMaps.isEmpty()) {
+ result.add(new RenameColumnEvent(tableId,
renameColumnMaps));
+ Map<String, String> finalRenameColumnMaps1 =
renameColumnMaps;
+ renameColumnMaps
+ .keySet()
+ .forEach(
+ e -> {
+ beforeColumnsOidMaps.put(
+ finalRenameColumnMaps1.get(e),
+ beforeColumnsOidMaps.get(e));
+ });
+ }
+ newAddColumnNames.removeAll(renameColumnMaps.values());
+ newDelColumnNames.removeAll(renameColumnMaps.keySet());
+ }
+ // Process the fields of add
+ if (!newAddColumnNames.isEmpty()) {
+ newAddColumns(
+ tableId,
+ afterSchema,
+ result,
+ columns,
+ newAddColumnNames,
+ beforeColumnsOidMaps);
+ }
+ // Process the fields of delete
+ if (!newDelColumnNames.isEmpty()) {
+ newDelColumns(
+ tableId,
+ beforeColumnNames,
+ result,
+ newDelColumnNames,
+ beforeColumnsOidMaps);
+ }
+ // Handling fields with changed types
+ findModifiedTypeColumns(
+ tableId,
+ beforeSchema,
+ beforeColumnNames,
+ afterSchema,
+ result,
+ columns,
+ renameColumnMaps);
+ }
+ schemaMap.put(tableId, afterSchema);
+ }
+
+ private void newDelColumns(
+ TableId tableId,
+ List<String> beforeColumnNames,
+ List<Event> result,
+ List<String> newDelColumnNames,
+ Map<String, Integer> beforeColumnsOidMaps) {
+ newDelColumnNames.forEach(
+ e -> {
+ result.add(new DropColumnEvent(tableId,
Collections.singletonList(e)));
+ beforeColumnNames.removeAll(newDelColumnNames);
+ beforeColumnsOidMaps.remove(e);
+ });
+ }
+
+ private void newAddColumns(
+ TableId tableId,
+ Schema afterSchema,
+ List<Event> result,
+ List<Column> columns,
+ List<String> newAddColumnNames,
+ Map<String, Integer> beforeColumnsOidMaps) {
+ List<Column> newAddColumns =
+ columns.stream()
+ .filter(e -> newAddColumnNames.contains(e.getName()))
+ .collect(Collectors.toList());
+ if (newAddColumns.size() != newAddColumnNames.size()) {
+ List<String> notInCurrentTableColumns =
+ newAddColumnNames.stream()
+ .filter(e -> !newAddColumns.contains(e))
+ .collect(Collectors.toList());
+ notInCurrentTableColumns.forEach(
+ e -> {
+ Field field = afterSchema.field(e);
+ DataType dataType =
convertKafkaTypeToFlintType(field.schema());
+ PhysicalColumn column =
+ new PhysicalColumn(e, dataType,
field.schema().doc());
+ result.add(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new
AddColumnEvent.ColumnWithPosition(
+ column,
+
AddColumnEvent.ColumnPosition.LAST,
+ null))));
+ });
+ }
+ newAddColumns.forEach(
+ e -> {
+ SchemaChangeEvent event =
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new
AddColumnEvent.ColumnWithPosition(
+ e,
AddColumnEvent.ColumnPosition.LAST, null)));
+ result.add(event);
+ });
+ Map<String, Integer> newAddColumnsOidMaps =
+ PostgresSchemaUtils.getColumnOids(postgresSourceConfig,
tableId, newAddColumnNames);
+ newAddColumnNames.forEach(
+ e -> {
+ beforeColumnsOidMaps.put(e,
newAddColumnsOidMaps.getOrDefault(e, null));
+ });
+ }
+
+ private static DataType convertKafkaTypeToFlintType(Schema kafkaSchema) {
+ switch (kafkaSchema.type()) {
+ case STRING:
+ return DataTypes.STRING();
+ case INT8:
+ return DataTypes.TINYINT();
+ case INT16:
+ return DataTypes.SMALLINT();
+ case INT32:
+ return DataTypes.INT();
+ case INT64:
+ return DataTypes.BIGINT();
+ case FLOAT32:
+ return DataTypes.FLOAT();
+ case FLOAT64:
+ return DataTypes.DOUBLE();
+ case BOOLEAN:
+ return DataTypes.BOOLEAN();
+ case BYTES:
+ return DataTypes.BYTES();
+ case ARRAY:
+ Schema elementSchema = kafkaSchema.valueSchema();
+ DataType elementType =
convertKafkaTypeToFlintType(elementSchema);
+ return DataTypes.ARRAY(elementType);
+ case STRUCT:
+ Schema schema = kafkaSchema.valueSchema();
+ DataType dataType = convertKafkaTypeToFlintType(schema);
+ return DataTypes.ROW(dataType);
+ case MAP:
+ return DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING());
+ default:
+ return DataTypes.STRING();
+ }
+ }
+
+ private void findModifiedTypeColumns(
+ TableId tableId,
+ Schema oldSchema,
+ List<String> oldColumnNames,
+ Schema afterSchema,
+ List<Event> result,
+ List<Column> columns,
+ Map<String, String> renameColumnMaps) {
+ Map<String, String> finalRenameColumnMaps = renameColumnMaps;
+ oldColumnNames.stream()
+ .forEach(
+ oldFieldName -> {
+ Field oldField = oldSchema.field(oldFieldName);
+ Field afterField = afterSchema.field(oldFieldName);
+ if (afterField == null) {
+ afterField =
+
afterSchema.field(finalRenameColumnMaps.get(oldFieldName));
+ }
+ String afterFieldName = afterField.name();
+ if (!oldField.schema()
+ .type()
+ .getName()
+
.equals(afterField.schema().type().getName())
+ || (oldField.schema().defaultValue() !=
null
+ &&
afterField.schema().defaultValue() != null
+ && !oldField.schema()
+ .defaultValue()
+
.equals(afterField.schema().defaultValue()))
+ || (oldField.schema().parameters() != null
+ &&
afterField.schema().parameters() != null
+ && !oldField.schema()
+ .parameters()
+
.equals(afterField.schema().parameters()))) {
+ Map<String, DataType> typeMapping = new
HashMap<>();
+ Column column =
+ columns.stream()
+ .filter(e ->
e.getName().equals(afterFieldName))
+ .findFirst()
+ .get();
Review Comment:
NoSuchElementException risk: The code uses .get() on an Optional at line 349
without checking if the element is present. If no column matches the filter
condition (e.getName().equals(afterFieldName)), this will throw a
NoSuchElementException. Use .orElseThrow() with a descriptive error message or
handle the absent case appropriately.
```suggestion
.orElseThrow(
() ->
new
IllegalStateException(
"Failed to find column with name '"
+ afterFieldName
+ "' in table "
+ tableId));
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Event deserializer for {@link PostgresDataSource}. */
@Internal
public class PostgresEventDeserializer extends
DebeziumEventDeserializationSchema {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresEventDeserializer.class);
private static final long serialVersionUID = 1L;
private List<PostgreSQLReadableMetadata> readableMetadataList;
private final boolean includeDatabaseInTableId;
private final String databaseName;
+ private Map<TableId, Schema> schemaMap = new HashMap<>();
+ private final PostgresSourceConfig postgresSourceConfig;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps;
Review Comment:
Concurrency issue: The schemaMap and beforeTableColumnsOidMaps fields are
mutable non-final HashMap instances that are accessed and modified during
deserialization. If the deserializer is used in a multi-threaded context (which
is common in Flink), these maps are not thread-safe and can lead to race
conditions, data corruption, or ConcurrentModificationException. Consider using
ConcurrentHashMap or adding proper synchronization.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Event deserializer for {@link PostgresDataSource}. */
@Internal
public class PostgresEventDeserializer extends
DebeziumEventDeserializationSchema {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresEventDeserializer.class);
private static final long serialVersionUID = 1L;
private List<PostgreSQLReadableMetadata> readableMetadataList;
private final boolean includeDatabaseInTableId;
private final String databaseName;
+ private Map<TableId, Schema> schemaMap = new HashMap<>();
+ private final PostgresSourceConfig postgresSourceConfig;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps;
public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
- this(changelogMode, new ArrayList<>(), false, null);
+ this(changelogMode, new ArrayList<>(), false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList) {
- this(changelogMode, readableMetadataList, false, null);
+ this(changelogMode, readableMetadataList, false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId) {
- this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null);
+ this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId,
- String databaseName) {
+ String databaseName,
+ PostgresSourceConfig postgresSourceConfig,
+ Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps) {
super(new PostgresSchemaDataTypeInference(), changelogMode);
this.readableMetadataList = readableMetadataList;
this.includeDatabaseInTableId = includeDatabaseInTableId;
this.databaseName = databaseName;
+ this.postgresSourceConfig = postgresSourceConfig;
+ this.beforeTableColumnsOidMaps = beforeTableColumnsOidMaps;
}
@Override
protected List<SchemaChangeEvent>
deserializeSchemaChangeRecord(SourceRecord record) {
return Collections.emptyList();
}
+ @Override
+ public List<? extends Event> deserialize(SourceRecord record) throws
Exception {
+ List<Event> result = new ArrayList<>();
+ if (postgresSourceConfig.isIncludeSchemaChanges()) {
+ handleSchemaChange(record, result);
+ }
+ if (isDataChangeRecord(record)) {
+ LOG.trace("Process data change record: {}", record);
+ result.addAll(deserializeDataChangeRecord(record));
+ } else if (isSchemaChangeRecord(record)) {
+ LOG.trace("Process schema change record: {}", record);
+ } else {
+ LOG.trace("Ignored other record: {}", record);
+ return Collections.emptyList();
+ }
+ return result;
+ }
+
+ private void handleSchemaChange(SourceRecord record, List<Event> result) {
+ TableId tableId = getTableId(record);
+ Schema valueSchema = record.valueSchema();
+ Schema beforeSchema = schemaMap.get(tableId);
+ List<String> beforeColumnNames;
+ List<String> afterColumnNames;
+ Schema afterSchema = fieldSchema(valueSchema,
Envelope.FieldName.AFTER);
+ List<Field> afterFields = afterSchema.fields();
+ org.apache.flink.cdc.common.schema.Schema schema =
+ PostgresSchemaUtils.getTableSchema(postgresSourceConfig,
tableId);
+ List<Column> columns = schema.getColumns();
+ Map<String, Integer> beforeColumnsOidMaps =
+
beforeTableColumnsOidMaps.get(TableId.tableId(tableId.getTableName()));
+ // When the first piece of data arrives, beforeSchema is empty
+ if (beforeSchema != null) {
+ beforeColumnNames =
+ beforeSchema.fields().stream().map(e ->
e.name()).collect(Collectors.toList());
+ afterColumnNames = afterFields.stream().map(e ->
e.name()).collect(Collectors.toList());
+ List<String> newAddColumnNames =
findAddedElements(beforeColumnNames, afterColumnNames);
+ List<String> newDelColumnNames =
+ findRemovedElements(beforeColumnNames, afterColumnNames);
+ Map<String, String> renameColumnMaps = new HashMap<>();
+ // Process the fields of rename
+ if (!newAddColumnNames.isEmpty() && !newDelColumnNames.isEmpty()) {
+ renameColumnMaps =
+ getRenameColumnMaps(
+ tableId,
+ newAddColumnNames,
+ newDelColumnNames,
+ beforeSchema,
+ afterSchema,
+ beforeColumnsOidMaps);
+ if (!renameColumnMaps.isEmpty()) {
+ result.add(new RenameColumnEvent(tableId,
renameColumnMaps));
+ Map<String, String> finalRenameColumnMaps1 =
renameColumnMaps;
+ renameColumnMaps
+ .keySet()
+ .forEach(
+ e -> {
+ beforeColumnsOidMaps.put(
+ finalRenameColumnMaps1.get(e),
+ beforeColumnsOidMaps.get(e));
+ });
+ }
+ newAddColumnNames.removeAll(renameColumnMaps.values());
+ newDelColumnNames.removeAll(renameColumnMaps.keySet());
+ }
+ // Process the fields of add
+ if (!newAddColumnNames.isEmpty()) {
+ newAddColumns(
+ tableId,
+ afterSchema,
+ result,
+ columns,
+ newAddColumnNames,
+ beforeColumnsOidMaps);
+ }
+ // Process the fields of delete
+ if (!newDelColumnNames.isEmpty()) {
+ newDelColumns(
+ tableId,
+ beforeColumnNames,
+ result,
+ newDelColumnNames,
+ beforeColumnsOidMaps);
+ }
+ // Handling fields with changed types
+ findModifiedTypeColumns(
+ tableId,
+ beforeSchema,
+ beforeColumnNames,
+ afterSchema,
+ result,
+ columns,
+ renameColumnMaps);
+ }
+ schemaMap.put(tableId, afterSchema);
+ }
+
+ private void newDelColumns(
+ TableId tableId,
+ List<String> beforeColumnNames,
+ List<Event> result,
+ List<String> newDelColumnNames,
+ Map<String, Integer> beforeColumnsOidMaps) {
+ newDelColumnNames.forEach(
+ e -> {
+ result.add(new DropColumnEvent(tableId,
Collections.singletonList(e)));
+ beforeColumnNames.removeAll(newDelColumnNames);
+ beforeColumnsOidMaps.remove(e);
+ });
+ }
+
+ private void newAddColumns(
+ TableId tableId,
+ Schema afterSchema,
+ List<Event> result,
+ List<Column> columns,
+ List<String> newAddColumnNames,
+ Map<String, Integer> beforeColumnsOidMaps) {
+ List<Column> newAddColumns =
+ columns.stream()
+ .filter(e -> newAddColumnNames.contains(e.getName()))
+ .collect(Collectors.toList());
+ if (newAddColumns.size() != newAddColumnNames.size()) {
+ List<String> notInCurrentTableColumns =
+ newAddColumnNames.stream()
+ .filter(e -> !newAddColumns.contains(e))
Review Comment:
Logic error in filtering: Line 240 uses newAddColumns.contains(e) where 'e'
is a String (from newAddColumnNames), but newAddColumns is a List of Column
objects. The contains() method will always return false because it compares
String to Column objects. This should filter based on column names instead.
Use:
!newAddColumns.stream().map(Column::getName).collect(Collectors.toList()).contains(e)
```suggestion
List<String> newAddColumnNamesInCurrentTable =
newAddColumns.stream()
.map(Column::getName)
.collect(Collectors.toList());
if (newAddColumns.size() != newAddColumnNames.size()) {
List<String> notInCurrentTableColumns =
newAddColumnNames.stream()
.filter(e ->
!newAddColumnNamesInCurrentTable.contains(e))
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java:
##########
@@ -100,6 +102,64 @@ public static Schema getTableSchema(PostgresSourceConfig
sourceConfig, TableId t
}
}
+ public static Map<String, Integer> getColumnOids(
+ PostgresSourceConfig sourceConfig, TableId tableId, List<String>
columns) {
+ try (PostgresConnection jdbc =
getPostgresDialect(sourceConfig).openJdbcConnection()) {
+ Map<String, Integer> oidMaps = new HashMap<>();
+ String inClause =
+ columns.stream()
+ .map(column -> "'" + column.replace("'", "''") +
"'")
+ .collect(Collectors.joining(",", "in (", ")"));
+ try {
+ jdbc.query(
+ "SELECT a.attname attname,a.attnum as oid \n"
+ + "FROM pg_attribute a\n"
+ + "JOIN pg_class b ON a.attrelid = b.oid\n"
+ + "WHERE b.relname = '"
+ + tableId.getTableName()
+ + "' AND a.attname "
+ + inClause,
+ rs -> {
+ while (rs.next()) {
+ oidMaps.put(rs.getString(1), rs.getInt(2));
+ }
+ });
+ } catch (SQLException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ return oidMaps;
+ }
+ }
+
+ public static Map<TableId, Map<String, Integer>> getAllTablesColumnOids(
+ PostgresSourceConfig sourceConfig, List<String> tableList) {
+ try (PostgresConnection jdbc =
getPostgresDialect(sourceConfig).openJdbcConnection()) {
+ Map<TableId, Map<String, Integer>> tableOidMaps = new HashMap<>();
+ String inClause =
+ tableList.stream()
+ .map(table -> "'" +
table.split("\\.")[1].replace("'", "''") + "'")
Review Comment:
Potential ArrayIndexOutOfBoundsException: The code assumes that
table.split("\\.") will always have at least 2 elements (accessing index [1]).
If a table name doesn't contain a dot, this will throw an exception. Add
validation to ensure the split result has the expected number of elements
before accessing index 1.
```suggestion
.map(
table -> {
String[] parts = table.split("\\.");
String tableName =
parts.length > 1 ? parts[1]
: parts[0];
return "'" + tableName.replace("'",
"''") + "'";
})
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Event deserializer for {@link PostgresDataSource}. */
@Internal
public class PostgresEventDeserializer extends
DebeziumEventDeserializationSchema {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresEventDeserializer.class);
private static final long serialVersionUID = 1L;
private List<PostgreSQLReadableMetadata> readableMetadataList;
private final boolean includeDatabaseInTableId;
private final String databaseName;
+ private Map<TableId, Schema> schemaMap = new HashMap<>();
+ private final PostgresSourceConfig postgresSourceConfig;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps;
public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
- this(changelogMode, new ArrayList<>(), false, null);
+ this(changelogMode, new ArrayList<>(), false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList) {
- this(changelogMode, readableMetadataList, false, null);
+ this(changelogMode, readableMetadataList, false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId) {
- this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null);
+ this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId,
- String databaseName) {
+ String databaseName,
+ PostgresSourceConfig postgresSourceConfig,
+ Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps) {
super(new PostgresSchemaDataTypeInference(), changelogMode);
this.readableMetadataList = readableMetadataList;
this.includeDatabaseInTableId = includeDatabaseInTableId;
this.databaseName = databaseName;
+ this.postgresSourceConfig = postgresSourceConfig;
+ this.beforeTableColumnsOidMaps = beforeTableColumnsOidMaps;
}
@Override
protected List<SchemaChangeEvent>
deserializeSchemaChangeRecord(SourceRecord record) {
return Collections.emptyList();
}
+ @Override
+ public List<? extends Event> deserialize(SourceRecord record) throws
Exception {
+ List<Event> result = new ArrayList<>();
+ if (postgresSourceConfig.isIncludeSchemaChanges()) {
Review Comment:
Potential NullPointerException: At line 118,
postgresSourceConfig.isIncludeSchemaChanges() is called without checking if
postgresSourceConfig is null. The constructor allows postgresSourceConfig to be
null (see constructors at lines 78-93), which would cause a
NullPointerException here. Add a null check before calling
isIncludeSchemaChanges().
```suggestion
if (postgresSourceConfig != null &&
postgresSourceConfig.isIncludeSchemaChanges()) {
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java:
##########
@@ -100,6 +102,64 @@ public static Schema getTableSchema(PostgresSourceConfig
sourceConfig, TableId t
}
}
+ public static Map<String, Integer> getColumnOids(
+ PostgresSourceConfig sourceConfig, TableId tableId, List<String>
columns) {
+ try (PostgresConnection jdbc =
getPostgresDialect(sourceConfig).openJdbcConnection()) {
+ Map<String, Integer> oidMaps = new HashMap<>();
+ String inClause =
+ columns.stream()
+ .map(column -> "'" + column.replace("'", "''") +
"'")
+ .collect(Collectors.joining(",", "in (", ")"));
+ try {
+ jdbc.query(
+ "SELECT a.attname attname,a.attnum as oid \n"
+ + "FROM pg_attribute a\n"
+ + "JOIN pg_class b ON a.attrelid = b.oid\n"
+ + "WHERE b.relname = '"
+ + tableId.getTableName()
+ + "' AND a.attname "
+ + inClause,
+ rs -> {
+ while (rs.next()) {
+ oidMaps.put(rs.getString(1), rs.getInt(2));
+ }
+ });
+ } catch (SQLException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ return oidMaps;
+ }
+ }
+
+ public static Map<TableId, Map<String, Integer>> getAllTablesColumnOids(
+ PostgresSourceConfig sourceConfig, List<String> tableList) {
+ try (PostgresConnection jdbc =
getPostgresDialect(sourceConfig).openJdbcConnection()) {
+ Map<TableId, Map<String, Integer>> tableOidMaps = new HashMap<>();
+ String inClause =
+ tableList.stream()
+ .map(table -> "'" +
table.split("\\.")[1].replace("'", "''") + "'")
+ .collect(Collectors.joining(",", "in (", ")"));
+ try {
+ jdbc.query(
+ "SELECT b.relname,a.attname attname,a.attnum AS oid
FROM pg_attribute a JOIN pg_class b ON a.attrelid = b.oid WHERE b.relname "
+ + inClause
+ + " and a.attnum > 0 and a.attisdropped = 'f'
group by b.relname,a.attname,a.attnum",
Review Comment:
Performance concern: The query at line 144-146 uses a GROUP BY clause
unnecessarily when retrieving column OIDs. Since we're already filtering by
attnum > 0 and attisdropped = 'f', each (relname, attname, attnum) combination
should be unique without needing GROUP BY. Removing the GROUP BY would improve
query performance.
```suggestion
+ " and a.attnum > 0 and a.attisdropped =
'f'",
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java:
##########
@@ -100,6 +102,64 @@ public static Schema getTableSchema(PostgresSourceConfig
sourceConfig, TableId t
}
}
+ public static Map<String, Integer> getColumnOids(
+ PostgresSourceConfig sourceConfig, TableId tableId, List<String>
columns) {
+ try (PostgresConnection jdbc =
getPostgresDialect(sourceConfig).openJdbcConnection()) {
+ Map<String, Integer> oidMaps = new HashMap<>();
+ String inClause =
+ columns.stream()
+ .map(column -> "'" + column.replace("'", "''") +
"'")
+ .collect(Collectors.joining(",", "in (", ")"));
+ try {
+ jdbc.query(
+ "SELECT a.attname attname,a.attnum as oid \n"
+ + "FROM pg_attribute a\n"
+ + "JOIN pg_class b ON a.attrelid = b.oid\n"
+ + "WHERE b.relname = '"
+ + tableId.getTableName()
Review Comment:
Missing schema qualification in query: The query at lines 114-121 filters by
table name (b.relname) only, without considering the schema/namespace. In
PostgreSQL, multiple schemas can have tables with the same name. This could
return incorrect column OIDs if tables with the same name exist in different
schemas. The query should also filter by schema to ensure accurate results,
especially since TableId includes schema information.
```suggestion
String tableName = tableId.getTableName().replace("'", "''");
String schemaName = tableId.getSchemaName().replace("'", "''");
try {
jdbc.query(
"SELECT a.attname attname,a.attnum as oid \n"
+ "FROM pg_attribute a\n"
+ "JOIN pg_class b ON a.attrelid = b.oid\n"
+ "JOIN pg_namespace n ON b.relnamespace =
n.oid\n"
+ "WHERE b.relname = '"
+ tableName
+ "' AND n.nspname = '"
+ schemaName
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java:
##########
@@ -100,6 +102,64 @@ public static Schema getTableSchema(PostgresSourceConfig
sourceConfig, TableId t
}
}
+ public static Map<String, Integer> getColumnOids(
+ PostgresSourceConfig sourceConfig, TableId tableId, List<String>
columns) {
+ try (PostgresConnection jdbc =
getPostgresDialect(sourceConfig).openJdbcConnection()) {
+ Map<String, Integer> oidMaps = new HashMap<>();
+ String inClause =
+ columns.stream()
+ .map(column -> "'" + column.replace("'", "''") +
"'")
+ .collect(Collectors.joining(",", "in (", ")"));
+ try {
+ jdbc.query(
+ "SELECT a.attname attname,a.attnum as oid \n"
+ + "FROM pg_attribute a\n"
+ + "JOIN pg_class b ON a.attrelid = b.oid\n"
+ + "WHERE b.relname = '"
+ + tableId.getTableName()
+ + "' AND a.attname "
+ + inClause,
+ rs -> {
+ while (rs.next()) {
+ oidMaps.put(rs.getString(1), rs.getInt(2));
+ }
+ });
Review Comment:
SQL injection vulnerability: The tableId.getTableName() is directly
concatenated into the SQL query without proper escaping or use of prepared
statements. An attacker could exploit this by crafting a malicious table name.
Consider using parameterized queries or properly escaping the table name.
```suggestion
+ "WHERE b.relname = ? AND a.attname "
+ inClause,
rs -> {
while (rs.next()) {
oidMaps.put(rs.getString(1), rs.getInt(2));
}
},
tableId.getTableName());
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Event deserializer for {@link PostgresDataSource}. */
@Internal
public class PostgresEventDeserializer extends
DebeziumEventDeserializationSchema {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresEventDeserializer.class);
private static final long serialVersionUID = 1L;
private List<PostgreSQLReadableMetadata> readableMetadataList;
private final boolean includeDatabaseInTableId;
private final String databaseName;
+ private Map<TableId, Schema> schemaMap = new HashMap<>();
+ private final PostgresSourceConfig postgresSourceConfig;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps;
public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
- this(changelogMode, new ArrayList<>(), false, null);
+ this(changelogMode, new ArrayList<>(), false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList) {
- this(changelogMode, readableMetadataList, false, null);
+ this(changelogMode, readableMetadataList, false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId) {
- this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null);
+ this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId,
- String databaseName) {
+ String databaseName,
+ PostgresSourceConfig postgresSourceConfig,
+ Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps) {
super(new PostgresSchemaDataTypeInference(), changelogMode);
this.readableMetadataList = readableMetadataList;
this.includeDatabaseInTableId = includeDatabaseInTableId;
this.databaseName = databaseName;
+ this.postgresSourceConfig = postgresSourceConfig;
+ this.beforeTableColumnsOidMaps = beforeTableColumnsOidMaps;
}
@Override
protected List<SchemaChangeEvent>
deserializeSchemaChangeRecord(SourceRecord record) {
return Collections.emptyList();
}
+ @Override
+ public List<? extends Event> deserialize(SourceRecord record) throws
Exception {
+ List<Event> result = new ArrayList<>();
+ if (postgresSourceConfig.isIncludeSchemaChanges()) {
+ handleSchemaChange(record, result);
+ }
+ if (isDataChangeRecord(record)) {
+ LOG.trace("Process data change record: {}", record);
+ result.addAll(deserializeDataChangeRecord(record));
+ } else if (isSchemaChangeRecord(record)) {
+ LOG.trace("Process schema change record: {}", record);
+ } else {
+ LOG.trace("Ignored other record: {}", record);
+ return Collections.emptyList();
+ }
+ return result;
+ }
+
+ private void handleSchemaChange(SourceRecord record, List<Event> result) {
+ TableId tableId = getTableId(record);
+ Schema valueSchema = record.valueSchema();
+ Schema beforeSchema = schemaMap.get(tableId);
+ List<String> beforeColumnNames;
+ List<String> afterColumnNames;
+ Schema afterSchema = fieldSchema(valueSchema,
Envelope.FieldName.AFTER);
+ List<Field> afterFields = afterSchema.fields();
+ org.apache.flink.cdc.common.schema.Schema schema =
+ PostgresSchemaUtils.getTableSchema(postgresSourceConfig,
tableId);
+ List<Column> columns = schema.getColumns();
+ Map<String, Integer> beforeColumnsOidMaps =
+
beforeTableColumnsOidMaps.get(TableId.tableId(tableId.getTableName()));
Review Comment:
Potential NullPointerException: The code does not check if
beforeColumnsOidMaps is null before accessing it. If
beforeTableColumnsOidMaps.get() returns null for a table, subsequent operations
will throw a NullPointerException. Add a null check and proper handling for
when the table is not found in the map.
```suggestion
beforeTableColumnsOidMaps.get(TableId.tableId(tableId.getTableName()));
if (beforeColumnsOidMaps == null) {
beforeColumnsOidMaps = new HashMap<>();
beforeTableColumnsOidMaps.put(
TableId.tableId(tableId.getTableName()),
beforeColumnsOidMaps);
}
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java:
##########
@@ -100,6 +102,64 @@ public static Schema getTableSchema(PostgresSourceConfig
sourceConfig, TableId t
}
}
+ public static Map<String, Integer> getColumnOids(
+ PostgresSourceConfig sourceConfig, TableId tableId, List<String>
columns) {
+ try (PostgresConnection jdbc =
getPostgresDialect(sourceConfig).openJdbcConnection()) {
+ Map<String, Integer> oidMaps = new HashMap<>();
+ String inClause =
+ columns.stream()
+ .map(column -> "'" + column.replace("'", "''") +
"'")
+ .collect(Collectors.joining(",", "in (", ")"));
+ try {
+ jdbc.query(
+ "SELECT a.attname attname,a.attnum as oid \n"
+ + "FROM pg_attribute a\n"
+ + "JOIN pg_class b ON a.attrelid = b.oid\n"
+ + "WHERE b.relname = '"
+ + tableId.getTableName()
+ + "' AND a.attname "
+ + inClause,
+ rs -> {
+ while (rs.next()) {
+ oidMaps.put(rs.getString(1), rs.getInt(2));
+ }
+ });
+ } catch (SQLException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ return oidMaps;
+ }
+ }
+
+ public static Map<TableId, Map<String, Integer>> getAllTablesColumnOids(
+ PostgresSourceConfig sourceConfig, List<String> tableList) {
+ try (PostgresConnection jdbc =
getPostgresDialect(sourceConfig).openJdbcConnection()) {
+ Map<TableId, Map<String, Integer>> tableOidMaps = new HashMap<>();
+ String inClause =
+ tableList.stream()
+ .map(table -> "'" +
table.split("\\.")[1].replace("'", "''") + "'")
+ .collect(Collectors.joining(",", "in (", ")"));
+ try {
+ jdbc.query(
+ "SELECT b.relname,a.attname attname,a.attnum AS oid
FROM pg_attribute a JOIN pg_class b ON a.attrelid = b.oid WHERE b.relname "
+ + inClause
+ + " and a.attnum > 0 and a.attisdropped = 'f'
group by b.relname,a.attname,a.attnum",
+ rs -> {
+ while (rs.next()) {
+ TableId tableId =
TableId.tableId(rs.getString(1));
Review Comment:
Inconsistent TableId construction: At line 149, TableId.tableId() is called
with only the table name (relname), which creates a single-component TableId.
However, elsewhere in the code (e.g., line 145 in PostgresEventDeserializer),
TableId.tableId(tableId.getTableName()) is also used with just the table name.
This inconsistency can lead to lookup failures if the beforeTableColumnsOidMaps
uses different TableId formats. The TableId should be constructed consistently
with schema information to match how it's used throughout the code.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java:
##########
@@ -67,12 +72,17 @@ public PostgresDataSource(
public EventSourceProvider getEventSourceProvider() {
String databaseName = postgresSourceConfig.getDatabaseList().get(0);
boolean includeDatabaseInTableId =
postgresSourceConfig.isIncludeDatabaseInTableId();
+ beforeTableColumnsOidMaps =
+ PostgresSchemaUtils.getAllTablesColumnOids(
+ postgresSourceConfig,
postgresSourceConfig.getTableList());
Review Comment:
Resource leak risk: The getAllTablesColumnOids method at line 76 opens a
database connection internally. If this method throws an exception, the
connection may not be properly closed. While the method uses try-with-resources
internally, if an exception occurs between line 75 and 85 during deserializer
construction, the partially initialized state could cause issues. Consider
wrapping this in proper error handling.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Event deserializer for {@link PostgresDataSource}. */
@Internal
public class PostgresEventDeserializer extends
DebeziumEventDeserializationSchema {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresEventDeserializer.class);
private static final long serialVersionUID = 1L;
private List<PostgreSQLReadableMetadata> readableMetadataList;
private final boolean includeDatabaseInTableId;
private final String databaseName;
+ private Map<TableId, Schema> schemaMap = new HashMap<>();
+ private final PostgresSourceConfig postgresSourceConfig;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps;
public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
- this(changelogMode, new ArrayList<>(), false, null);
+ this(changelogMode, new ArrayList<>(), false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList) {
- this(changelogMode, readableMetadataList, false, null);
+ this(changelogMode, readableMetadataList, false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId) {
- this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null);
+ this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId,
- String databaseName) {
+ String databaseName,
+ PostgresSourceConfig postgresSourceConfig,
+ Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps) {
super(new PostgresSchemaDataTypeInference(), changelogMode);
this.readableMetadataList = readableMetadataList;
this.includeDatabaseInTableId = includeDatabaseInTableId;
this.databaseName = databaseName;
+ this.postgresSourceConfig = postgresSourceConfig;
+ this.beforeTableColumnsOidMaps = beforeTableColumnsOidMaps;
}
@Override
protected List<SchemaChangeEvent>
deserializeSchemaChangeRecord(SourceRecord record) {
return Collections.emptyList();
}
+ @Override
+ public List<? extends Event> deserialize(SourceRecord record) throws
Exception {
+ List<Event> result = new ArrayList<>();
+ if (postgresSourceConfig.isIncludeSchemaChanges()) {
+ handleSchemaChange(record, result);
+ }
+ if (isDataChangeRecord(record)) {
+ LOG.trace("Process data change record: {}", record);
+ result.addAll(deserializeDataChangeRecord(record));
+ } else if (isSchemaChangeRecord(record)) {
+ LOG.trace("Process schema change record: {}", record);
+ } else {
+ LOG.trace("Ignored other record: {}", record);
+ return Collections.emptyList();
+ }
+ return result;
+ }
+
+ private void handleSchemaChange(SourceRecord record, List<Event> result) {
+ TableId tableId = getTableId(record);
+ Schema valueSchema = record.valueSchema();
+ Schema beforeSchema = schemaMap.get(tableId);
+ List<String> beforeColumnNames;
+ List<String> afterColumnNames;
+ Schema afterSchema = fieldSchema(valueSchema,
Envelope.FieldName.AFTER);
+ List<Field> afterFields = afterSchema.fields();
+ org.apache.flink.cdc.common.schema.Schema schema =
+ PostgresSchemaUtils.getTableSchema(postgresSourceConfig,
tableId);
+ List<Column> columns = schema.getColumns();
+ Map<String, Integer> beforeColumnsOidMaps =
+
beforeTableColumnsOidMaps.get(TableId.tableId(tableId.getTableName()));
+ // When the first piece of data arrives, beforeSchema is empty
+ if (beforeSchema != null) {
+ beforeColumnNames =
+ beforeSchema.fields().stream().map(e ->
e.name()).collect(Collectors.toList());
+ afterColumnNames = afterFields.stream().map(e ->
e.name()).collect(Collectors.toList());
+ List<String> newAddColumnNames =
findAddedElements(beforeColumnNames, afterColumnNames);
+ List<String> newDelColumnNames =
+ findRemovedElements(beforeColumnNames, afterColumnNames);
+ Map<String, String> renameColumnMaps = new HashMap<>();
+ // Process the fields of rename
+ if (!newAddColumnNames.isEmpty() && !newDelColumnNames.isEmpty()) {
+ renameColumnMaps =
+ getRenameColumnMaps(
+ tableId,
+ newAddColumnNames,
+ newDelColumnNames,
+ beforeSchema,
+ afterSchema,
+ beforeColumnsOidMaps);
+ if (!renameColumnMaps.isEmpty()) {
+ result.add(new RenameColumnEvent(tableId,
renameColumnMaps));
+ Map<String, String> finalRenameColumnMaps1 =
renameColumnMaps;
+ renameColumnMaps
+ .keySet()
+ .forEach(
+ e -> {
+ beforeColumnsOidMaps.put(
+ finalRenameColumnMaps1.get(e),
+ beforeColumnsOidMaps.get(e));
+ });
+ }
+ newAddColumnNames.removeAll(renameColumnMaps.values());
+ newDelColumnNames.removeAll(renameColumnMaps.keySet());
+ }
+ // Process the fields of add
+ if (!newAddColumnNames.isEmpty()) {
+ newAddColumns(
+ tableId,
+ afterSchema,
+ result,
+ columns,
+ newAddColumnNames,
+ beforeColumnsOidMaps);
+ }
+ // Process the fields of delete
+ if (!newDelColumnNames.isEmpty()) {
+ newDelColumns(
+ tableId,
+ beforeColumnNames,
+ result,
+ newDelColumnNames,
+ beforeColumnsOidMaps);
+ }
+ // Handling fields with changed types
+ findModifiedTypeColumns(
+ tableId,
+ beforeSchema,
+ beforeColumnNames,
+ afterSchema,
+ result,
+ columns,
+ renameColumnMaps);
+ }
+ schemaMap.put(tableId, afterSchema);
+ }
+
+ private void newDelColumns(
+ TableId tableId,
+ List<String> beforeColumnNames,
+ List<Event> result,
+ List<String> newDelColumnNames,
+ Map<String, Integer> beforeColumnsOidMaps) {
+ newDelColumnNames.forEach(
+ e -> {
+ result.add(new DropColumnEvent(tableId,
Collections.singletonList(e)));
+ beforeColumnNames.removeAll(newDelColumnNames);
+ beforeColumnsOidMaps.remove(e);
+ });
Review Comment:
Inefficient loop operation: Inside the forEach loop at line 218, the code
calls beforeColumnNames.removeAll(newDelColumnNames) on every iteration. This
means the same removeAll operation is performed multiple times unnecessarily.
Move this operation outside the loop to execute it only once after all
iterations complete.
```suggestion
beforeColumnsOidMaps.remove(e);
});
beforeColumnNames.removeAll(newDelColumnNames);
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Event deserializer for {@link PostgresDataSource}. */
@Internal
public class PostgresEventDeserializer extends
DebeziumEventDeserializationSchema {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresEventDeserializer.class);
private static final long serialVersionUID = 1L;
private List<PostgreSQLReadableMetadata> readableMetadataList;
private final boolean includeDatabaseInTableId;
private final String databaseName;
+ private Map<TableId, Schema> schemaMap = new HashMap<>();
+ private final PostgresSourceConfig postgresSourceConfig;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps;
public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
- this(changelogMode, new ArrayList<>(), false, null);
+ this(changelogMode, new ArrayList<>(), false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList) {
- this(changelogMode, readableMetadataList, false, null);
+ this(changelogMode, readableMetadataList, false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId) {
- this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null);
+ this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId,
- String databaseName) {
+ String databaseName,
+ PostgresSourceConfig postgresSourceConfig,
+ Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps) {
super(new PostgresSchemaDataTypeInference(), changelogMode);
this.readableMetadataList = readableMetadataList;
this.includeDatabaseInTableId = includeDatabaseInTableId;
this.databaseName = databaseName;
+ this.postgresSourceConfig = postgresSourceConfig;
+ this.beforeTableColumnsOidMaps = beforeTableColumnsOidMaps;
}
@Override
protected List<SchemaChangeEvent>
deserializeSchemaChangeRecord(SourceRecord record) {
return Collections.emptyList();
}
+ @Override
+ public List<? extends Event> deserialize(SourceRecord record) throws
Exception {
+ List<Event> result = new ArrayList<>();
+ if (postgresSourceConfig.isIncludeSchemaChanges()) {
+ handleSchemaChange(record, result);
+ }
+ if (isDataChangeRecord(record)) {
+ LOG.trace("Process data change record: {}", record);
+ result.addAll(deserializeDataChangeRecord(record));
+ } else if (isSchemaChangeRecord(record)) {
+ LOG.trace("Process schema change record: {}", record);
+ } else {
+ LOG.trace("Ignored other record: {}", record);
+ return Collections.emptyList();
+ }
+ return result;
+ }
+
+ private void handleSchemaChange(SourceRecord record, List<Event> result) {
+ TableId tableId = getTableId(record);
+ Schema valueSchema = record.valueSchema();
+ Schema beforeSchema = schemaMap.get(tableId);
+ List<String> beforeColumnNames;
+ List<String> afterColumnNames;
+ Schema afterSchema = fieldSchema(valueSchema,
Envelope.FieldName.AFTER);
+ List<Field> afterFields = afterSchema.fields();
+ org.apache.flink.cdc.common.schema.Schema schema =
+ PostgresSchemaUtils.getTableSchema(postgresSourceConfig,
tableId);
+ List<Column> columns = schema.getColumns();
+ Map<String, Integer> beforeColumnsOidMaps =
+
beforeTableColumnsOidMaps.get(TableId.tableId(tableId.getTableName()));
+ // When the first piece of data arrives, beforeSchema is empty
+ if (beforeSchema != null) {
+ beforeColumnNames =
+ beforeSchema.fields().stream().map(e ->
e.name()).collect(Collectors.toList());
+ afterColumnNames = afterFields.stream().map(e ->
e.name()).collect(Collectors.toList());
+ List<String> newAddColumnNames =
findAddedElements(beforeColumnNames, afterColumnNames);
+ List<String> newDelColumnNames =
+ findRemovedElements(beforeColumnNames, afterColumnNames);
+ Map<String, String> renameColumnMaps = new HashMap<>();
+ // Process the fields of rename
+ if (!newAddColumnNames.isEmpty() && !newDelColumnNames.isEmpty()) {
+ renameColumnMaps =
+ getRenameColumnMaps(
+ tableId,
+ newAddColumnNames,
+ newDelColumnNames,
+ beforeSchema,
+ afterSchema,
+ beforeColumnsOidMaps);
+ if (!renameColumnMaps.isEmpty()) {
+ result.add(new RenameColumnEvent(tableId,
renameColumnMaps));
+ Map<String, String> finalRenameColumnMaps1 =
renameColumnMaps;
+ renameColumnMaps
+ .keySet()
+ .forEach(
+ e -> {
+ beforeColumnsOidMaps.put(
+ finalRenameColumnMaps1.get(e),
+ beforeColumnsOidMaps.get(e));
+ });
+ }
+ newAddColumnNames.removeAll(renameColumnMaps.values());
+ newDelColumnNames.removeAll(renameColumnMaps.keySet());
+ }
+ // Process the fields of add
+ if (!newAddColumnNames.isEmpty()) {
+ newAddColumns(
+ tableId,
+ afterSchema,
+ result,
+ columns,
+ newAddColumnNames,
+ beforeColumnsOidMaps);
+ }
+ // Process the fields of delete
+ if (!newDelColumnNames.isEmpty()) {
+ newDelColumns(
+ tableId,
+ beforeColumnNames,
+ result,
+ newDelColumnNames,
+ beforeColumnsOidMaps);
+ }
+ // Handling fields with changed types
+ findModifiedTypeColumns(
+ tableId,
+ beforeSchema,
+ beforeColumnNames,
+ afterSchema,
+ result,
+ columns,
+ renameColumnMaps);
+ }
+ schemaMap.put(tableId, afterSchema);
+ }
+
+ private void newDelColumns(
+ TableId tableId,
+ List<String> beforeColumnNames,
+ List<Event> result,
+ List<String> newDelColumnNames,
+ Map<String, Integer> beforeColumnsOidMaps) {
+ newDelColumnNames.forEach(
+ e -> {
+ result.add(new DropColumnEvent(tableId,
Collections.singletonList(e)));
+ beforeColumnNames.removeAll(newDelColumnNames);
+ beforeColumnsOidMaps.remove(e);
+ });
+ }
+
+ private void newAddColumns(
+ TableId tableId,
+ Schema afterSchema,
+ List<Event> result,
+ List<Column> columns,
+ List<String> newAddColumnNames,
+ Map<String, Integer> beforeColumnsOidMaps) {
+ List<Column> newAddColumns =
+ columns.stream()
+ .filter(e -> newAddColumnNames.contains(e.getName()))
+ .collect(Collectors.toList());
+ if (newAddColumns.size() != newAddColumnNames.size()) {
+ List<String> notInCurrentTableColumns =
+ newAddColumnNames.stream()
+ .filter(e -> !newAddColumns.contains(e))
+ .collect(Collectors.toList());
+ notInCurrentTableColumns.forEach(
+ e -> {
+ Field field = afterSchema.field(e);
+ DataType dataType =
convertKafkaTypeToFlintType(field.schema());
+ PhysicalColumn column =
+ new PhysicalColumn(e, dataType,
field.schema().doc());
+ result.add(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new
AddColumnEvent.ColumnWithPosition(
+ column,
+
AddColumnEvent.ColumnPosition.LAST,
+ null))));
+ });
+ }
+ newAddColumns.forEach(
+ e -> {
+ SchemaChangeEvent event =
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new
AddColumnEvent.ColumnWithPosition(
+ e,
AddColumnEvent.ColumnPosition.LAST, null)));
+ result.add(event);
+ });
+ Map<String, Integer> newAddColumnsOidMaps =
+ PostgresSchemaUtils.getColumnOids(postgresSourceConfig,
tableId, newAddColumnNames);
+ newAddColumnNames.forEach(
+ e -> {
+ beforeColumnsOidMaps.put(e,
newAddColumnsOidMaps.getOrDefault(e, null));
+ });
+ }
+
+ private static DataType convertKafkaTypeToFlintType(Schema kafkaSchema) {
+ switch (kafkaSchema.type()) {
+ case STRING:
+ return DataTypes.STRING();
+ case INT8:
+ return DataTypes.TINYINT();
+ case INT16:
+ return DataTypes.SMALLINT();
+ case INT32:
+ return DataTypes.INT();
+ case INT64:
+ return DataTypes.BIGINT();
+ case FLOAT32:
+ return DataTypes.FLOAT();
+ case FLOAT64:
+ return DataTypes.DOUBLE();
+ case BOOLEAN:
+ return DataTypes.BOOLEAN();
+ case BYTES:
+ return DataTypes.BYTES();
+ case ARRAY:
+ Schema elementSchema = kafkaSchema.valueSchema();
+ DataType elementType =
convertKafkaTypeToFlintType(elementSchema);
+ return DataTypes.ARRAY(elementType);
+ case STRUCT:
+ Schema schema = kafkaSchema.valueSchema();
+ DataType dataType = convertKafkaTypeToFlintType(schema);
+ return DataTypes.ROW(dataType);
+ case MAP:
+ return DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING());
+ default:
+ return DataTypes.STRING();
+ }
+ }
+
+ private void findModifiedTypeColumns(
+ TableId tableId,
+ Schema oldSchema,
+ List<String> oldColumnNames,
+ Schema afterSchema,
+ List<Event> result,
+ List<Column> columns,
+ Map<String, String> renameColumnMaps) {
+ Map<String, String> finalRenameColumnMaps = renameColumnMaps;
+ oldColumnNames.stream()
+ .forEach(
+ oldFieldName -> {
+ Field oldField = oldSchema.field(oldFieldName);
+ Field afterField = afterSchema.field(oldFieldName);
+ if (afterField == null) {
+ afterField =
+
afterSchema.field(finalRenameColumnMaps.get(oldFieldName));
+ }
Review Comment:
Potential NullPointerException: At line 329, afterField could still be null
if both afterSchema.field(oldFieldName) returns null and
finalRenameColumnMaps.get(oldFieldName) also returns null or the field doesn't
exist. Attempting to call afterField.name() will throw a NullPointerException.
Add a null check before accessing afterField.
```suggestion
String renamedFieldName =
finalRenameColumnMaps.get(oldFieldName);
if (renamedFieldName != null) {
afterField =
afterSchema.field(renamedFieldName);
}
}
if (afterField == null) {
// No corresponding field in the
after-schema (even after rename); skip.
return;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]