loserwang1024 commented on code in PR #4233:
URL: https://github.com/apache/flink-cdc/pull/4233#discussion_r2909690410
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Event deserializer for {@link PostgresDataSource}. */
@Internal
public class PostgresEventDeserializer extends
DebeziumEventDeserializationSchema {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresEventDeserializer.class);
private static final long serialVersionUID = 1L;
private List<PostgreSQLReadableMetadata> readableMetadataList;
private final boolean includeDatabaseInTableId;
private final String databaseName;
+ private Map<TableId, Schema> schemaMap = new HashMap<>();
+ private final PostgresSourceConfig postgresSourceConfig;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps;
public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
- this(changelogMode, new ArrayList<>(), false, null);
+ this(changelogMode, new ArrayList<>(), false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList) {
- this(changelogMode, readableMetadataList, false, null);
+ this(changelogMode, readableMetadataList, false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId) {
- this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null);
+ this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId,
- String databaseName) {
+ String databaseName,
+ PostgresSourceConfig postgresSourceConfig,
+ Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps) {
super(new PostgresSchemaDataTypeInference(), changelogMode);
this.readableMetadataList = readableMetadataList;
this.includeDatabaseInTableId = includeDatabaseInTableId;
this.databaseName = databaseName;
+ this.postgresSourceConfig = postgresSourceConfig;
+ this.beforeTableColumnsOidMaps = beforeTableColumnsOidMaps;
}
@Override
protected List<SchemaChangeEvent>
deserializeSchemaChangeRecord(SourceRecord record) {
return Collections.emptyList();
}
+ @Override
+ public List<? extends Event> deserialize(SourceRecord record) throws
Exception {
+ List<Event> result = new ArrayList<>();
+ if (postgresSourceConfig.isIncludeSchemaChanges()) {
+ handleSchemaChange(record, result);
+ }
+ if (isDataChangeRecord(record)) {
+ LOG.trace("Process data change record: {}", record);
+ result.addAll(deserializeDataChangeRecord(record));
+ } else if (isSchemaChangeRecord(record)) {
+ LOG.trace("Process schema change record: {}", record);
+ } else {
+ LOG.trace("Ignored other record: {}", record);
+ return Collections.emptyList();
+ }
+ return result;
+ }
+
+ private void handleSchemaChange(SourceRecord record, List<Event> result) {
+ TableId tableId = getTableId(record);
+ Schema valueSchema = record.valueSchema();
+ Schema beforeSchema = schemaMap.get(tableId);
+ List<String> beforeColumnNames;
+ List<String> afterColumnNames;
+ Schema afterSchema = fieldSchema(valueSchema,
Envelope.FieldName.AFTER);
+ List<Field> afterFields = afterSchema.fields();
+ org.apache.flink.cdc.common.schema.Schema schema =
Review Comment:
```java
org.apache.flink.cdc.common.schema.Schema schema =
PostgresSchemaUtils.getTableSchema(postgresSourceConfig,
tableId);
```
It's too heavy that lookup current schema from database for each record. In
production, it cannot work.
Why not just just afterSchema and beforeSchema? why stiil need to lookup
current schema?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]