JNSimba commented on code in PR #4233:
URL: https://github.com/apache/flink-cdc/pull/4233#discussion_r2894774189


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
 import io.debezium.data.geometry.Geography;
 import io.debezium.data.geometry.Geometry;
 import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.locationtech.jts.geom.Coordinate;
 import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /** Event deserializer for {@link PostgresDataSource}. */
 @Internal
 public class PostgresEventDeserializer extends 
DebeziumEventDeserializationSchema {
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresEventDeserializer.class);
     private static final long serialVersionUID = 1L;
     private List<PostgreSQLReadableMetadata> readableMetadataList;
     private final boolean includeDatabaseInTableId;
     private final String databaseName;
+    private Map<TableId, Schema> schemaMap = new HashMap<>();
+    private final PostgresSourceConfig postgresSourceConfig;
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps;
 
     public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
-        this(changelogMode, new ArrayList<>(), false, null);
+        this(changelogMode, new ArrayList<>(), false, null, null, null);
     }
 
     public PostgresEventDeserializer(
             DebeziumChangelogMode changelogMode,
             List<PostgreSQLReadableMetadata> readableMetadataList) {
-        this(changelogMode, readableMetadataList, false, null);
+        this(changelogMode, readableMetadataList, false, null, null, null);
     }
 
     public PostgresEventDeserializer(
             DebeziumChangelogMode changelogMode,
             List<PostgreSQLReadableMetadata> readableMetadataList,
             boolean includeDatabaseInTableId) {
-        this(changelogMode, readableMetadataList, includeDatabaseInTableId, 
null);
+        this(changelogMode, readableMetadataList, includeDatabaseInTableId, 
null, null, null);
     }
 
     public PostgresEventDeserializer(
             DebeziumChangelogMode changelogMode,
             List<PostgreSQLReadableMetadata> readableMetadataList,
             boolean includeDatabaseInTableId,
-            String databaseName) {
+            String databaseName,
+            PostgresSourceConfig postgresSourceConfig,
+            Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps) {
         super(new PostgresSchemaDataTypeInference(), changelogMode);
         this.readableMetadataList = readableMetadataList;
         this.includeDatabaseInTableId = includeDatabaseInTableId;
         this.databaseName = databaseName;
+        this.postgresSourceConfig = postgresSourceConfig;
+        this.beforeTableColumnsOidMaps = beforeTableColumnsOidMaps;
     }
 
     @Override
     protected List<SchemaChangeEvent> 
deserializeSchemaChangeRecord(SourceRecord record) {
         return Collections.emptyList();
     }
 
+    @Override
+    public List<? extends Event> deserialize(SourceRecord record) throws 
Exception {
+        List<Event> result = new ArrayList<>();
+        if (postgresSourceConfig.isIncludeSchemaChanges()) {
+            handleSchemaChange(record, result);
+        }
+        if (isDataChangeRecord(record)) {
+            LOG.trace("Process data change record: {}", record);
+            result.addAll(deserializeDataChangeRecord(record));
+        } else if (isSchemaChangeRecord(record)) {
+            LOG.trace("Process schema change record: {}", record);
+        } else {
+            LOG.trace("Ignored other record: {}", record);
+            return Collections.emptyList();
+        }
+        return result;
+    }
+
+    private void handleSchemaChange(SourceRecord record, List<Event> result) {
+        TableId tableId = getTableId(record);
+        Schema valueSchema = record.valueSchema();
+        Schema beforeSchema = schemaMap.get(tableId);
+        List<String> beforeColumnNames;
+        List<String> afterColumnNames;
+        Schema afterSchema = fieldSchema(valueSchema, 
Envelope.FieldName.AFTER);
+        List<Field> afterFields = afterSchema.fields();
+        org.apache.flink.cdc.common.schema.Schema schema =
+                PostgresSchemaUtils.getTableSchema(postgresSourceConfig, 
tableId);
+        List<Column> columns = schema.getColumns();
+        Map<String, Integer> beforeColumnsOidMaps =
+                
beforeTableColumnsOidMaps.get(TableId.tableId(tableId.getTableName()));
+        // When the first piece of data arrives, beforeSchema is empty
+        if (beforeSchema != null) {
+            beforeColumnNames =
+                    beforeSchema.fields().stream().map(e -> 
e.name()).collect(Collectors.toList());
+            afterColumnNames = afterFields.stream().map(e -> 
e.name()).collect(Collectors.toList());
+            List<String> newAddColumnNames = 
findAddedElements(beforeColumnNames, afterColumnNames);
+            List<String> newDelColumnNames =
+                    findRemovedElements(beforeColumnNames, afterColumnNames);
+            Map<String, String> renameColumnMaps = new HashMap<>();
+            // Process the fields of rename
+            if (!newAddColumnNames.isEmpty() && !newDelColumnNames.isEmpty()) {
+                renameColumnMaps =
+                        getRenameColumnMaps(
+                                tableId,
+                                newAddColumnNames,
+                                newDelColumnNames,
+                                beforeSchema,
+                                afterSchema,
+                                beforeColumnsOidMaps);
+                if (!renameColumnMaps.isEmpty()) {
+                    result.add(new RenameColumnEvent(tableId, 
renameColumnMaps));
+                    Map<String, String> finalRenameColumnMaps1 = 
renameColumnMaps;
+                    renameColumnMaps
+                            .keySet()
+                            .forEach(
+                                    e -> {
+                                        beforeColumnsOidMaps.put(
+                                                finalRenameColumnMaps1.get(e),
+                                                beforeColumnsOidMaps.get(e));
+                                    });
+                }
+                newAddColumnNames.removeAll(renameColumnMaps.values());
+                newDelColumnNames.removeAll(renameColumnMaps.keySet());
+            }
+            // Process the fields of add
+            if (!newAddColumnNames.isEmpty()) {
+                newAddColumns(
+                        tableId,
+                        afterSchema,
+                        result,
+                        columns,
+                        newAddColumnNames,
+                        beforeColumnsOidMaps);
+            }
+            // Process the fields of delete
+            if (!newDelColumnNames.isEmpty()) {
+                newDelColumns(
+                        tableId,
+                        beforeColumnNames,
+                        result,
+                        newDelColumnNames,
+                        beforeColumnsOidMaps);
+            }
+            // Handling fields with changed types
+            findModifiedTypeColumns(
+                    tableId,
+                    beforeSchema,
+                    beforeColumnNames,
+                    afterSchema,
+                    result,
+                    columns,
+                    renameColumnMaps);
+        }
+        schemaMap.put(tableId, afterSchema);
+    }
+
+    private void newDelColumns(
+            TableId tableId,
+            List<String> beforeColumnNames,
+            List<Event> result,
+            List<String> newDelColumnNames,
+            Map<String, Integer> beforeColumnsOidMaps) {
+        newDelColumnNames.forEach(
+                e -> {
+                    result.add(new DropColumnEvent(tableId, 
Collections.singletonList(e)));
+                    beforeColumnNames.removeAll(newDelColumnNames);
+                    beforeColumnsOidMaps.remove(e);
+                });
+    }
+
+    private void newAddColumns(
+            TableId tableId,
+            Schema afterSchema,
+            List<Event> result,
+            List<Column> columns,
+            List<String> newAddColumnNames,
+            Map<String, Integer> beforeColumnsOidMaps) {
+        List<Column> newAddColumns =
+                columns.stream()
+                        .filter(e -> newAddColumnNames.contains(e.getName()))
+                        .collect(Collectors.toList());
+        if (newAddColumns.size() != newAddColumnNames.size()) {
+            List<String> notInCurrentTableColumns =
+                    newAddColumnNames.stream()
+                            .filter(e -> !newAddColumns.contains(e))
+                            .collect(Collectors.toList());
+            notInCurrentTableColumns.forEach(
+                    e -> {
+                        Field field = afterSchema.field(e);
+                        DataType dataType = 
convertKafkaTypeToFlintType(field.schema());
+                        PhysicalColumn column =
+                                new PhysicalColumn(e, dataType, 
field.schema().doc());
+                        result.add(
+                                new AddColumnEvent(
+                                        tableId,
+                                        Collections.singletonList(
+                                                new 
AddColumnEvent.ColumnWithPosition(
+                                                        column,
+                                                        
AddColumnEvent.ColumnPosition.LAST,
+                                                        null))));
+                    });
+        }
+        newAddColumns.forEach(
+                e -> {
+                    SchemaChangeEvent event =
+                            new AddColumnEvent(
+                                    tableId,
+                                    Collections.singletonList(
+                                            new 
AddColumnEvent.ColumnWithPosition(
+                                                    e, 
AddColumnEvent.ColumnPosition.LAST, null)));
+                    result.add(event);
+                });
+        Map<String, Integer> newAddColumnsOidMaps =
+                PostgresSchemaUtils.getColumnOids(postgresSourceConfig, 
tableId, newAddColumnNames);
+        newAddColumnNames.forEach(
+                e -> {
+                    beforeColumnsOidMaps.put(e, 
newAddColumnsOidMaps.getOrDefault(e, null));
+                });
+    }
+
+    private static DataType convertKafkaTypeToFlintType(Schema kafkaSchema) {
+        switch (kafkaSchema.type()) {
+            case STRING:
+                return DataTypes.STRING();
+            case INT8:
+                return DataTypes.TINYINT();
+            case INT16:
+                return DataTypes.SMALLINT();
+            case INT32:
+                return DataTypes.INT();
+            case INT64:
+                return DataTypes.BIGINT();
+            case FLOAT32:
+                return DataTypes.FLOAT();
+            case FLOAT64:
+                return DataTypes.DOUBLE();
+            case BOOLEAN:
+                return DataTypes.BOOLEAN();
+            case BYTES:
+                return DataTypes.BYTES();
+            case ARRAY:
+                Schema elementSchema = kafkaSchema.valueSchema();
+                DataType elementType = 
convertKafkaTypeToFlintType(elementSchema);
+                return DataTypes.ARRAY(elementType);
+            case STRUCT:
+                Schema schema = kafkaSchema.valueSchema();
+                DataType dataType = convertKafkaTypeToFlintType(schema);
+                return DataTypes.ROW(dataType);
+            case MAP:
+                return DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING());
+            default:
+                return DataTypes.STRING();
+        }
+    }
+
+    private void findModifiedTypeColumns(
+            TableId tableId,
+            Schema oldSchema,
+            List<String> oldColumnNames,
+            Schema afterSchema,
+            List<Event> result,
+            List<Column> columns,
+            Map<String, String> renameColumnMaps) {
+        Map<String, String> finalRenameColumnMaps = renameColumnMaps;
+        oldColumnNames.stream()
+                .forEach(
+                        oldFieldName -> {
+                            Field oldField = oldSchema.field(oldFieldName);
+                            Field afterField = afterSchema.field(oldFieldName);
+                            if (afterField == null) {
+                                afterField =
+                                        
afterSchema.field(finalRenameColumnMaps.get(oldFieldName));
+                            }
+                            String afterFieldName = afterField.name();
+                            if (!oldField.schema()
+                                            .type()
+                                            .getName()
+                                            
.equals(afterField.schema().type().getName())
+                                    || (oldField.schema().defaultValue() != 
null
+                                            && 
afterField.schema().defaultValue() != null
+                                            && !oldField.schema()
+                                                    .defaultValue()
+                                                    
.equals(afterField.schema().defaultValue()))
+                                    || (oldField.schema().parameters() != null
+                                            && 
afterField.schema().parameters() != null
+                                            && !oldField.schema()
+                                                    .parameters()
+                                                    
.equals(afterField.schema().parameters()))) {
+                                Map<String, DataType> typeMapping = new 
HashMap<>();
+                                Column column =
+                                        columns.stream()
+                                                .filter(e -> 
e.getName().equals(afterFieldName))
+                                                .findFirst()
+                                                .get();
+                                typeMapping.put(afterField.name(), 
column.getType());
+                                result.add(new AlterColumnTypeEvent(tableId, 
typeMapping));
+                            }
+                        });
+    }
+
+    private Map<String, String> getRenameColumnMaps(
+            TableId tableId,
+            List<String> newAddColumnNames,
+            List<String> newDelColumnNames,
+            Schema oldSchema,
+            Schema afterSchema,
+            Map<String, Integer> beforeColumnsOidMaps) {
+        Map<String, String> renameColumnMaps = new HashMap<>();
+        Map<String, Integer> newAddColumnsOidMaps =
+                PostgresSchemaUtils.getColumnOids(postgresSourceConfig, 
tableId, newAddColumnNames);

Review Comment:
   Since DDL is driven by DML, consider this scenario:
   1. DDL is executed at T1
   2. DML is executed at T2
   3. DDL is executed at T3
   
   If a query is performed at T2, the obtained OID might be from T3, which 
could cause problems.
   
   By the way, it seems difficult to distinguish between DROP+ADD and Rename?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to