Copilot commented on code in PR #4316:
URL: https://github.com/apache/flink-cdc/pull/4316#discussion_r2957667042


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java:
##########
@@ -131,6 +132,7 @@ public DataSource createDataSource(Context context) {
         boolean skipSnapshotBackfill = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
         int lsnCommitCheckpointsDelay = 
config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
         boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE);
+        boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED);

Review Comment:
   `schema-change.enabled` is read into `includeSchemaChanges` and then wired 
to `.includeSchemaChanges(...)`, but the PR introduces 
`PostgresSourceConfigFactory.enableSchemaChange(...)` / 
`PostgresSourceConfig.isSchemaChangeEnabled()` as the actual toggle for 
inferring schema changes from pgoutput Relation messages. As-is, enabling 
`schema-change.enabled` won't enable inference, and may instead turn on 
emitting schema-change SourceRecords (which can break when 
`PostgresSchemaRecord` is emitted). Wire this option to 
`enableSchemaChange(...)` (and keep `includeSchemaChanges` controlled by the 
existing schema-change output option, if any).
   



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.reader;
+
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
+import 
org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+
+/** Record emitter that recognizes {@link PostgresSchemaRecord} as schema 
change events. */
+public class PostgresSourceRecordEmitter<T> extends 
IncrementalSourceRecordEmitter<T> {
+    public PostgresSourceRecordEmitter(
+            DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
+            SourceReaderMetrics sourceReaderMetrics,
+            boolean includeSchemaChanges,
+            OffsetFactory offsetFactory) {
+        super(
+                debeziumDeserializationSchema,
+                sourceReaderMetrics,
+                includeSchemaChanges,
+                offsetFactory);
+    }
+
+    @Override
+    protected TableChanges getTableChangeRecord(SourceRecord element) throws 
IOException {
+        PostgresSchemaRecord schemaRecord = (PostgresSchemaRecord) element;
+        Table table = schemaRecord.getTable();
+        return new TableChanges().create(table);

Review Comment:
   `getTableChangeRecord` unconditionally casts `element` to 
`PostgresSchemaRecord`. `IncrementalSourceRecordEmitter` calls this for *any* 
schema change record (key schema matches 
`io.debezium.connector.*.SchemaChangeKey`), so a non-`PostgresSchemaRecord` 
schema-change record would cause a `ClassCastException`. Please guard with 
`instanceof` and fall back to `super.getTableChangeRecord(element)` for the 
standard HistoryRecord-based schema change events.
   



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java:
##########
@@ -137,68 +132,57 @@ protected void processElement(
             shouldEmitAllCreateTableEventsInSnapshotMode = false;
         } else if (isLowWatermarkEvent(element) && 
splitState.isSnapshotSplitState()) {
             TableId tableId = 
splitState.asSnapshotSplitState().toSourceSplit().getTableId();
-            if (!alreadySendCreateTableTables.contains(tableId)) {
-                sendCreateTableEvent(tableId, (SourceOutput<Event>) output);
-                alreadySendCreateTableTables.add(tableId);
-            }
-        } else {
-            boolean isDataChangeRecord = isDataChangeRecord(element);
-            if (isDataChangeRecord || isSchemaChangeEvent(element)) {
-                TableId tableId = getTableId(element);
-                if (!alreadySendCreateTableTables.contains(tableId)) {
-                    CreateTableEvent createTableEvent = 
createTableEventCache.get(tableId);
-                    if (createTableEvent != null) {
-                        output.collect((T) createTableEvent);
-                    }
-                    alreadySendCreateTableTables.add(tableId);
-                }
-                // In rare case, we may miss some CreateTableEvents before 
DataChangeEvents.
-                // Don't send CreateTableEvent for SchemaChangeEvents as it's 
the latest schema.
-                if (isDataChangeRecord && 
!createTableEventCache.containsKey(tableId)) {
-                    CreateTableEvent createTableEvent = 
getCreateTableEvent(sourceConfig, tableId);
-                    output.collect((T) createTableEvent);
-                    createTableEventCache.put(tableId, createTableEvent);
-                }
-            }
+            maybeSendCreateTableEventFromCache(tableId, output);
+        } else if (isDataChangeRecord(element)) {
+            handleDataChangeRecord(element, output);
+        } else if (isSchemaChangeEvent(element) && 
sourceConfig.isSchemaChangeEnabled()) {
+            handleSchemaChangeRecord(element, output, splitState);
         }
         super.processElement(element, output, splitState);
     }
 
-    private Schema buildSchemaFromTable(Table table) {
-        List<Column> columns = table.columns();
-        Schema.Builder tableBuilder = Schema.newBuilder();
-        for (int i = 0; i < columns.size(); i++) {
-            Column column = columns.get(i);
-
-            String colName = column.name();
-            DataType dataType;
-            try (PostgresConnection jdbc = 
postgresDialect.openJdbcConnection()) {
-                dataType =
-                        PostgresTypeUtils.fromDbzColumn(
-                                column,
-                                this.sourceConfig.getDbzConnectorConfig(),
-                                jdbc.getTypeRegistry());
-            }
-            if (!column.isOptional()) {
-                dataType = dataType.notNull();
-            }
-            tableBuilder.physicalColumn(
-                    colName,
-                    dataType,
-                    column.comment(),
-                    column.defaultValueExpression().orElse(null));
+    private void handleDataChangeRecord(SourceRecord element, SourceOutput<T> 
output) {
+        TableId tableId = getTableId(element);
+        maybeSendCreateTableEventFromCache(tableId, output);
+        // In rare case, we may miss some CreateTableEvents before 
DataChangeEvents.
+        // Don't send CreateTableEvent for SchemaChangeEvents as it's the 
latest schema.
+        if (!createTableEventCache.containsKey(tableId)) {
+            CreateTableEvent createTableEvent = 
getCreateTableEvent(sourceConfig, tableId);
+            sendCreateTableEvent(createTableEvent, output);
+            createTableEventCache.put(tableId, createTableEvent);
         }
-        tableBuilder.comment(table.comment());
+    }
 
-        List<String> primaryKey = table.primaryKeyColumnNames();
-        if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
-            tableBuilder.primaryKey(primaryKey);
+    private void handleSchemaChangeRecord(
+            SourceRecord element, SourceOutput<T> output, SourceSplitState 
splitState) {
+        Map<TableId, TableChanges.TableChange> existedTableSchemas =
+                splitState.toSourceSplit().getTableSchemas();
+        PostgresSchemaRecord schemaRecord = (PostgresSchemaRecord) element;
+        Table schemaAfter = schemaRecord.getTable();
+        maybeSendCreateTableEventFromCache(schemaAfter.id(), output);

Review Comment:
   `handleSchemaChangeRecord` unconditionally casts `element` to 
`PostgresSchemaRecord`. Since schema change detection is based on 
`isSchemaChangeEvent(...)` (key schema name pattern), any standard Debezium 
schema-change record would also enter this branch and trigger a 
`ClassCastException`. Please guard with `instanceof PostgresSchemaRecord` (and 
either ignore non-relation schema change records or handle them via the 
HistoryRecord/TableChanges path).



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/SchemaChangeUtil.java:
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.utils;
+
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.TypeRegistry;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toCdcTableId;
+import static 
org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toSchema;
+
+/**
+ * Utilities for inferring CDC schema change events by comparing before/after 
Debezium table
+ * schemas.
+ *
+ * <p>PostgreSQL DDL has exactly four operations that structurally change a 
table schema:
+ *
+ * <ul>
+ *   <li><b>Add column</b> — always appended at the end (PostgreSQL does not 
support adding a column
+ *       at an arbitrary position).
+ *   <li><b>Drop column</b> — removes a column by name.
+ *   <li><b>Rename column</b> — changes a column's name while preserving its 
position and type.
+ *   <li><b>Alter column type</b> — changes a column's type while preserving 
its name and position.
+ * </ul>
+ *
+ * <p>Ideally, PostgreSQL's {@code pg_attribute.attnum} would be the best way 
to distinguish whether
+ * a column was dropped-and-recreated or simply renamed. However, the pgoutput 
relation message does
+ * not include {@code attnum}. Even querying the database for the current 
attnum after the fact is
+ * unreliable due to temporal mismatch — for example, if column {@code C} is 
renamed to {@code D}
+ * and a new column {@code C} is added between two relation messages, a 
retroactive attnum query
+ * would see the <em>new</em> {@code C}'s attnum, not the original one, 
leading to incorrect
+ * inference.
+ *
+ * <p>Instead, this utility infers schema changes by comparing the 
before/after column lists and
+ * computing the <b>minimum edit</b> using the four operations above. The main 
inherent ambiguity is
+ * the last-column scenario: renaming the last column looks identical to 
dropping it and adding a
+ * new column with the same name at the end, since both produce the same 
before/after column list.
+ * The algorithm resolves this by always preferring the interpretation with 
the fewest operations
+ * (rename costs 1; drop + add costs 2), so a last-column name change is 
inferred as a rename.
+ */
+public class SchemaChangeUtil {
+    /**
+     * Infers the minimum list of schema change events that transform {@code 
tableBefore} into
+     * {@code tableAfter}. Returns a {@link CreateTableEvent} if {@code 
tableBefore} is null.
+     */
+    public static List<SchemaChangeEvent> inferSchemaChangeEvent(
+            io.debezium.relational.TableId dbzTableId,
+            @Nullable Table tableBefore,
+            Table tableAfter,
+            PostgresSourceConfig sourceConfig,
+            PostgresDialect dialect) {
+
+        if (tableBefore == null) {
+            return Collections.singletonList(toCreateTableEvent(tableAfter, 
sourceConfig, dialect));
+        }
+
+        TableId cdcTableId =
+                toCdcTableId(
+                        dbzTableId,
+                        sourceConfig.getDatabaseList().get(0),
+                        sourceConfig.isIncludeDatabaseInTableId());
+        PostgresConnectorConfig dbzConfig = 
sourceConfig.getDbzConnectorConfig();
+
+        try (PostgresConnection connection = dialect.openJdbcConnection()) {
+            TypeRegistry typeRegistry = connection.getTypeRegistry();
+            return inferMinimalSchemaChanges(
+                    cdcTableId,
+                    tableBefore.columns(),
+                    tableAfter.columns(),
+                    dbzConfig,
+                    typeRegistry);
+        }
+    }
+
+    public static CreateTableEvent toCreateTableEvent(
+            Table table, PostgresSourceConfig sourceConfig, PostgresDialect 
dialect) {
+        try (PostgresConnection connection = dialect.openJdbcConnection()) {
+            return toCreateTableEvent(table, sourceConfig, 
connection.getTypeRegistry());
+        }
+    }
+
+    private static CreateTableEvent toCreateTableEvent(
+            Table table, PostgresSourceConfig sourceConfig, TypeRegistry 
typeRegistry) {
+        return new CreateTableEvent(
+                toCdcTableId(
+                        table.id(),
+                        sourceConfig.getDatabaseList().get(0),
+                        sourceConfig.isIncludeDatabaseInTableId()),
+                toSchema(table, sourceConfig.getDbzConnectorConfig(), 
typeRegistry));
+    }
+
+    /**
+     * Finds the minimum schema change events to transform beforeCols into 
afterCols using recursion
+     * with memoization. Available operations: rename column, add column at 
last, drop column, alter
+     * column type. Recursion depth bounded by total column count.
+     */
+    private static List<SchemaChangeEvent> inferMinimalSchemaChanges(
+            TableId cdcTableId,
+            List<Column> beforeCols,
+            List<Column> afterCols,
+            PostgresConnectorConfig dbzConfig,
+            TypeRegistry typeRegistry) {
+
+        int n = beforeCols.size();
+        int m = afterCols.size();
+
+        // memo[i][j] = min cost from state (i, j), -1 means unvisited
+        int[][] memo = new int[n + 1][m + 1];
+        for (int[] row : memo) {
+            Arrays.fill(row, -1);
+        }
+
+        // Fill memoization table via recursion
+        minCost(0, 0, n, m, beforeCols, afterCols, memo, dbzConfig, 
typeRegistry);
+
+        // Traceback to build schema change events
+        return tracebackEvents(
+                cdcTableId, n, m, beforeCols, afterCols, memo, dbzConfig, 
typeRegistry);
+    }
+
+    /**
+     * Recursively computes the minimum number of individual column operations 
to align
+     * before[i..n-1] with after[j..m-1], where {@code i} is the current index 
into {@code
+     * beforeCols} (0..n) and {@code j} is the current index into {@code 
afterCols} (0..m). Boundary
+     * cases ({@code i == n} or {@code j == m}) return immediately without 
list access. At each
+     * non-boundary state, either drop before[i] (cost 1) or match before[i] 
to after[j] (cost =
+     * rename(0/1) + alterType(0/1)). Unmatched after columns are added at the 
end.
+     */
+    private static int minCost(
+            int i,
+            int j,
+            int n,
+            int m,
+            List<Column> beforeCols,
+            List<Column> afterCols,
+            int[][] memo,
+            PostgresConnectorConfig dbzConfig,
+            TypeRegistry typeRegistry) {
+
+        if (i == n) {
+            return m - j; // add remaining after columns
+        }
+        if (j == m) {
+            return n - i; // drop remaining before columns
+        }
+        if (memo[i][j] != -1) {
+            return memo[i][j];
+        }
+
+        // Option 1: drop beforeCols[i]
+        int dropCost =
+                1 + minCost(i + 1, j, n, m, beforeCols, afterCols, memo, 
dbzConfig, typeRegistry);
+
+        // Option 2: match beforeCols[i] to afterCols[j]
+        int matchCost =
+                columnMatchCost(beforeCols.get(i), afterCols.get(j), 
dbzConfig, typeRegistry)
+                        + minCost(
+                                i + 1,
+                                j + 1,
+                                n,
+                                m,
+                                beforeCols,
+                                afterCols,
+                                memo,
+                                dbzConfig,
+                                typeRegistry);
+
+        memo[i][j] = Math.min(dropCost, matchCost);
+        return memo[i][j];
+    }
+
+    /**
+     * Computes the min cost of matching a before column to an after column 
(0, 1, or 2). if column
+     * name is same and type is same, cost = 0; if column name is same and 
type is different, cost =
+     * 1(alter type); if column name is different and type is same, cost = 
1(rename); if column name
+     * is different and type is same, cost = 2(rename + alter type);

Review Comment:
   Javadoc typo/logic: the last sentence says "if column name is different and 
type is same" twice; the final case should describe the name-different *and* 
type-different scenario (rename + alter type). Please correct the comment to 
match the implementation.
   



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/PostgresSchemaRecord.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.schema;
+
+import io.debezium.relational.Table;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/** A SourceRecord wrapper carrying a Debezium Table for schema change 
propagation. */
+public class PostgresSchemaRecord extends SourceRecord {
+    // Use postgres-cdc rather than postgres to avoid conflict if debezium 
support later.

Review Comment:
   The comment on the schema change key says "Use postgres-cdc rather than 
postgres", but the actual schema name is 
`io.debezium.connector.postgres.SchemaChangeKey`. Please align the comment with 
the implementation (or update the schema name if the intent is to avoid 
clashing with Debezium’s own schema change records).
   



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java:
##########
@@ -311,6 +315,12 @@ public PostgresSourceBuilder<T> 
includeDatabaseInTableId(boolean includeDatabase
         return this;
     }
 
+    /** Whether to infer schema change event on relation message. */
+    public PostgresSourceBuilder<T> includeSchemaChanges(boolean 
includeSchemaChanges) {
+        this.configFactory.includeSchemaChanges(includeSchemaChanges);
+        return this;
+    }
+
     /**

Review Comment:
   This new `includeSchemaChanges(...)` builder method has misleading 
semantics/documentation: it says it's about inferring schema change events from 
Relation messages, but it actually delegates to 
`configFactory.includeSchemaChanges(...)` (the generic "forward schema change 
records" flag). This makes it easy to enable the wrong behavior and miss 
enabling `PostgresSourceConfigFactory.enableSchemaChange(...)`. Please either 
(a) rename/reword this method to match `includeSchemaChanges` semantics, or (b) 
add a separate builder method that sets `enableSchemaChange(...)` and keep this 
method for the existing include-schema-changes flag.
   



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java:
##########
@@ -172,6 +174,7 @@ public DataSource createDataSource(Context context) {
                         .lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
                         .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
                         .includeDatabaseInTableId(tableIdIncludeDatabase)
+                        .includeSchemaChanges(includeSchemaChanges)

Review Comment:
   `SCHEMA_CHANGE_ENABLED` is currently passed into 
`.includeSchemaChanges(...)` on the builder. This is very likely the wrong 
config knob: it flips whether raw Debezium schema change records are forwarded, 
not whether pgoutput Relation messages are converted into inferred schema 
change events. Consider calling the new `enableSchemaChange(...)` (or exposing 
a dedicated builder method for it) and leave `.includeSchemaChanges(...)` for 
the existing include-schema-changes behavior.
   



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/PostgresSchemaRecord.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.schema;
+
+import io.debezium.relational.Table;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/** A SourceRecord wrapper carrying a Debezium Table for schema change 
propagation. */
+public class PostgresSchemaRecord extends SourceRecord {
+    // Use postgres-cdc rather than postgres to avoid conflict if debezium 
support later.
+    private static final String SCHEMA_CHANGE_EVENT_KEY_NAME =
+            "io.debezium.connector.postgres.SchemaChangeKey";
+
+    private static final Schema KEY_SCHEMA =
+            SchemaBuilder.struct()
+                    .name(SCHEMA_CHANGE_EVENT_KEY_NAME)
+                    .field("table_id", Schema.STRING_SCHEMA)
+                    .build();
+
+    private final Table table;
+
+    public PostgresSchemaRecord(Table table) {
+        super(null, null, null, KEY_SCHEMA, buildKey(table), null, null);

Review Comment:
   `PostgresSchemaRecord` constructs a `SourceRecord` with `valueSchema == 
null` and `value == null`. If `includeSchemaChanges` is enabled, 
`IncrementalSourceRecordEmitter.emitElement()` will call 
`SourceRecordUtils.getMessageTimestamp(record)`, which dereferences 
`record.valueSchema()` and will throw an NPE for this record type. Either 
populate a minimal non-null value schema/value (so timestamp extraction safely 
returns null), or ensure this record type is never passed to `emitElement` 
(e.g., avoid matching `isSchemaChangeEvent` or gate emitting).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to