Copilot commented on code in PR #4316:
URL: https://github.com/apache/flink-cdc/pull/4316#discussion_r2957667042
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java:
##########
@@ -131,6 +132,7 @@ public DataSource createDataSource(Context context) {
boolean skipSnapshotBackfill =
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
int lsnCommitCheckpointsDelay =
config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE);
+ boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED);
Review Comment:
`schema-change.enabled` is read into `includeSchemaChanges` and then wired
to `.includeSchemaChanges(...)`, but the PR introduces
`PostgresSourceConfigFactory.enableSchemaChange(...)` /
`PostgresSourceConfig.isSchemaChangeEnabled()` as the actual toggle for
inferring schema changes from pgoutput Relation messages. As-is, enabling
`schema-change.enabled` won't enable inference, and may instead turn on
emitting schema-change SourceRecords (which can break when
`PostgresSchemaRecord` is emitted). Wire this option to
`enableSchemaChange(...)` (and keep `includeSchemaChanges` controlled by the
existing schema-change output option, if any).
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.reader;
+
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
+import
org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+
+/** Record emitter that recognizes {@link PostgresSchemaRecord} as schema
change events. */
+public class PostgresSourceRecordEmitter<T> extends
IncrementalSourceRecordEmitter<T> {
+ public PostgresSourceRecordEmitter(
+ DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
+ SourceReaderMetrics sourceReaderMetrics,
+ boolean includeSchemaChanges,
+ OffsetFactory offsetFactory) {
+ super(
+ debeziumDeserializationSchema,
+ sourceReaderMetrics,
+ includeSchemaChanges,
+ offsetFactory);
+ }
+
+ @Override
+ protected TableChanges getTableChangeRecord(SourceRecord element) throws
IOException {
+ PostgresSchemaRecord schemaRecord = (PostgresSchemaRecord) element;
+ Table table = schemaRecord.getTable();
+ return new TableChanges().create(table);
Review Comment:
`getTableChangeRecord` unconditionally casts `element` to
`PostgresSchemaRecord`. `IncrementalSourceRecordEmitter` calls this for *any*
schema change record (key schema matches
`io.debezium.connector.*.SchemaChangeKey`), so a non-`PostgresSchemaRecord`
schema-change record would cause a `ClassCastException`. Please guard with
`instanceof` and fall back to `super.getTableChangeRecord(element)` for the
standard HistoryRecord-based schema change events.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java:
##########
@@ -137,68 +132,57 @@ protected void processElement(
shouldEmitAllCreateTableEventsInSnapshotMode = false;
} else if (isLowWatermarkEvent(element) &&
splitState.isSnapshotSplitState()) {
TableId tableId =
splitState.asSnapshotSplitState().toSourceSplit().getTableId();
- if (!alreadySendCreateTableTables.contains(tableId)) {
- sendCreateTableEvent(tableId, (SourceOutput<Event>) output);
- alreadySendCreateTableTables.add(tableId);
- }
- } else {
- boolean isDataChangeRecord = isDataChangeRecord(element);
- if (isDataChangeRecord || isSchemaChangeEvent(element)) {
- TableId tableId = getTableId(element);
- if (!alreadySendCreateTableTables.contains(tableId)) {
- CreateTableEvent createTableEvent =
createTableEventCache.get(tableId);
- if (createTableEvent != null) {
- output.collect((T) createTableEvent);
- }
- alreadySendCreateTableTables.add(tableId);
- }
- // In rare case, we may miss some CreateTableEvents before
DataChangeEvents.
- // Don't send CreateTableEvent for SchemaChangeEvents as it's
the latest schema.
- if (isDataChangeRecord &&
!createTableEventCache.containsKey(tableId)) {
- CreateTableEvent createTableEvent =
getCreateTableEvent(sourceConfig, tableId);
- output.collect((T) createTableEvent);
- createTableEventCache.put(tableId, createTableEvent);
- }
- }
+ maybeSendCreateTableEventFromCache(tableId, output);
+ } else if (isDataChangeRecord(element)) {
+ handleDataChangeRecord(element, output);
+ } else if (isSchemaChangeEvent(element) &&
sourceConfig.isSchemaChangeEnabled()) {
+ handleSchemaChangeRecord(element, output, splitState);
}
super.processElement(element, output, splitState);
}
- private Schema buildSchemaFromTable(Table table) {
- List<Column> columns = table.columns();
- Schema.Builder tableBuilder = Schema.newBuilder();
- for (int i = 0; i < columns.size(); i++) {
- Column column = columns.get(i);
-
- String colName = column.name();
- DataType dataType;
- try (PostgresConnection jdbc =
postgresDialect.openJdbcConnection()) {
- dataType =
- PostgresTypeUtils.fromDbzColumn(
- column,
- this.sourceConfig.getDbzConnectorConfig(),
- jdbc.getTypeRegistry());
- }
- if (!column.isOptional()) {
- dataType = dataType.notNull();
- }
- tableBuilder.physicalColumn(
- colName,
- dataType,
- column.comment(),
- column.defaultValueExpression().orElse(null));
+ private void handleDataChangeRecord(SourceRecord element, SourceOutput<T>
output) {
+ TableId tableId = getTableId(element);
+ maybeSendCreateTableEventFromCache(tableId, output);
+ // In rare case, we may miss some CreateTableEvents before
DataChangeEvents.
+ // Don't send CreateTableEvent for SchemaChangeEvents as it's the
latest schema.
+ if (!createTableEventCache.containsKey(tableId)) {
+ CreateTableEvent createTableEvent =
getCreateTableEvent(sourceConfig, tableId);
+ sendCreateTableEvent(createTableEvent, output);
+ createTableEventCache.put(tableId, createTableEvent);
}
- tableBuilder.comment(table.comment());
+ }
- List<String> primaryKey = table.primaryKeyColumnNames();
- if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
- tableBuilder.primaryKey(primaryKey);
+ private void handleSchemaChangeRecord(
+ SourceRecord element, SourceOutput<T> output, SourceSplitState
splitState) {
+ Map<TableId, TableChanges.TableChange> existedTableSchemas =
+ splitState.toSourceSplit().getTableSchemas();
+ PostgresSchemaRecord schemaRecord = (PostgresSchemaRecord) element;
+ Table schemaAfter = schemaRecord.getTable();
+ maybeSendCreateTableEventFromCache(schemaAfter.id(), output);
Review Comment:
`handleSchemaChangeRecord` unconditionally casts `element` to
`PostgresSchemaRecord`. Since schema change detection is based on
`isSchemaChangeEvent(...)` (key schema name pattern), any standard Debezium
schema-change record would also enter this branch and trigger a
`ClassCastException`. Please guard with `instanceof PostgresSchemaRecord` (and
either ignore non-relation schema change records or handle them via the
HistoryRecord/TableChanges path).
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/SchemaChangeUtil.java:
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.utils;
+
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
+import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.TypeRegistry;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toCdcTableId;
+import static
org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toSchema;
+
+/**
+ * Utilities for inferring CDC schema change events by comparing before/after
Debezium table
+ * schemas.
+ *
+ * <p>PostgreSQL DDL has exactly four operations that structurally change a
table schema:
+ *
+ * <ul>
+ * <li><b>Add column</b> — always appended at the end (PostgreSQL does not
support adding a column
+ * at an arbitrary position).
+ * <li><b>Drop column</b> — removes a column by name.
+ * <li><b>Rename column</b> — changes a column's name while preserving its
position and type.
+ * <li><b>Alter column type</b> — changes a column's type while preserving
its name and position.
+ * </ul>
+ *
+ * <p>Ideally, PostgreSQL's {@code pg_attribute.attnum} would be the best way
to distinguish whether
+ * a column was dropped-and-recreated or simply renamed. However, the pgoutput
relation message does
+ * not include {@code attnum}. Even querying the database for the current
attnum after the fact is
+ * unreliable due to temporal mismatch — for example, if column {@code C} is
renamed to {@code D}
+ * and a new column {@code C} is added between two relation messages, a
retroactive attnum query
+ * would see the <em>new</em> {@code C}'s attnum, not the original one,
leading to incorrect
+ * inference.
+ *
+ * <p>Instead, this utility infers schema changes by comparing the
before/after column lists and
+ * computing the <b>minimum edit</b> using the four operations above. The main
inherent ambiguity is
+ * the last-column scenario: renaming the last column looks identical to
dropping it and adding a
+ * new column with the same name at the end, since both produce the same
before/after column list.
+ * The algorithm resolves this by always preferring the interpretation with
the fewest operations
+ * (rename costs 1; drop + add costs 2), so a last-column name change is
inferred as a rename.
+ */
+public class SchemaChangeUtil {
+ /**
+ * Infers the minimum list of schema change events that transform {@code
tableBefore} into
+ * {@code tableAfter}. Returns a {@link CreateTableEvent} if {@code
tableBefore} is null.
+ */
+ public static List<SchemaChangeEvent> inferSchemaChangeEvent(
+ io.debezium.relational.TableId dbzTableId,
+ @Nullable Table tableBefore,
+ Table tableAfter,
+ PostgresSourceConfig sourceConfig,
+ PostgresDialect dialect) {
+
+ if (tableBefore == null) {
+ return Collections.singletonList(toCreateTableEvent(tableAfter,
sourceConfig, dialect));
+ }
+
+ TableId cdcTableId =
+ toCdcTableId(
+ dbzTableId,
+ sourceConfig.getDatabaseList().get(0),
+ sourceConfig.isIncludeDatabaseInTableId());
+ PostgresConnectorConfig dbzConfig =
sourceConfig.getDbzConnectorConfig();
+
+ try (PostgresConnection connection = dialect.openJdbcConnection()) {
+ TypeRegistry typeRegistry = connection.getTypeRegistry();
+ return inferMinimalSchemaChanges(
+ cdcTableId,
+ tableBefore.columns(),
+ tableAfter.columns(),
+ dbzConfig,
+ typeRegistry);
+ }
+ }
+
+ public static CreateTableEvent toCreateTableEvent(
+ Table table, PostgresSourceConfig sourceConfig, PostgresDialect
dialect) {
+ try (PostgresConnection connection = dialect.openJdbcConnection()) {
+ return toCreateTableEvent(table, sourceConfig,
connection.getTypeRegistry());
+ }
+ }
+
+ private static CreateTableEvent toCreateTableEvent(
+ Table table, PostgresSourceConfig sourceConfig, TypeRegistry
typeRegistry) {
+ return new CreateTableEvent(
+ toCdcTableId(
+ table.id(),
+ sourceConfig.getDatabaseList().get(0),
+ sourceConfig.isIncludeDatabaseInTableId()),
+ toSchema(table, sourceConfig.getDbzConnectorConfig(),
typeRegistry));
+ }
+
+ /**
+ * Finds the minimum schema change events to transform beforeCols into
afterCols using recursion
+ * with memoization. Available operations: rename column, add column at
last, drop column, alter
+ * column type. Recursion depth bounded by total column count.
+ */
+ private static List<SchemaChangeEvent> inferMinimalSchemaChanges(
+ TableId cdcTableId,
+ List<Column> beforeCols,
+ List<Column> afterCols,
+ PostgresConnectorConfig dbzConfig,
+ TypeRegistry typeRegistry) {
+
+ int n = beforeCols.size();
+ int m = afterCols.size();
+
+ // memo[i][j] = min cost from state (i, j), -1 means unvisited
+ int[][] memo = new int[n + 1][m + 1];
+ for (int[] row : memo) {
+ Arrays.fill(row, -1);
+ }
+
+ // Fill memoization table via recursion
+ minCost(0, 0, n, m, beforeCols, afterCols, memo, dbzConfig,
typeRegistry);
+
+ // Traceback to build schema change events
+ return tracebackEvents(
+ cdcTableId, n, m, beforeCols, afterCols, memo, dbzConfig,
typeRegistry);
+ }
+
+ /**
+ * Recursively computes the minimum number of individual column operations
to align
+ * before[i..n-1] with after[j..m-1], where {@code i} is the current index
into {@code
+ * beforeCols} (0..n) and {@code j} is the current index into {@code
afterCols} (0..m). Boundary
+ * cases ({@code i == n} or {@code j == m}) return immediately without
list access. At each
+ * non-boundary state, either drop before[i] (cost 1) or match before[i]
to after[j] (cost =
+ * rename(0/1) + alterType(0/1)). Unmatched after columns are added at the
end.
+ */
+ private static int minCost(
+ int i,
+ int j,
+ int n,
+ int m,
+ List<Column> beforeCols,
+ List<Column> afterCols,
+ int[][] memo,
+ PostgresConnectorConfig dbzConfig,
+ TypeRegistry typeRegistry) {
+
+ if (i == n) {
+ return m - j; // add remaining after columns
+ }
+ if (j == m) {
+ return n - i; // drop remaining before columns
+ }
+ if (memo[i][j] != -1) {
+ return memo[i][j];
+ }
+
+ // Option 1: drop beforeCols[i]
+ int dropCost =
+ 1 + minCost(i + 1, j, n, m, beforeCols, afterCols, memo,
dbzConfig, typeRegistry);
+
+ // Option 2: match beforeCols[i] to afterCols[j]
+ int matchCost =
+ columnMatchCost(beforeCols.get(i), afterCols.get(j),
dbzConfig, typeRegistry)
+ + minCost(
+ i + 1,
+ j + 1,
+ n,
+ m,
+ beforeCols,
+ afterCols,
+ memo,
+ dbzConfig,
+ typeRegistry);
+
+ memo[i][j] = Math.min(dropCost, matchCost);
+ return memo[i][j];
+ }
+
+ /**
+ * Computes the min cost of matching a before column to an after column
(0, 1, or 2). if column
+ * name is same and type is same, cost = 0; if column name is same and
type is different, cost =
+ * 1(alter type); if column name is different and type is same, cost =
1(rename); if column name
+ * is different and type is same, cost = 2(rename + alter type);
Review Comment:
Javadoc typo/logic: the last sentence says "if column name is different and
type is same" twice; the final case should describe the name-different *and*
type-different scenario (rename + alter type). Please correct the comment to
match the implementation.
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/PostgresSchemaRecord.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.schema;
+
+import io.debezium.relational.Table;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/** A SourceRecord wrapper carrying a Debezium Table for schema change
propagation. */
+public class PostgresSchemaRecord extends SourceRecord {
+ // Use postgres-cdc rather than postgres to avoid conflict if debezium
support later.
Review Comment:
The comment on the schema change key says "Use postgres-cdc rather than
postgres", but the actual schema name is
`io.debezium.connector.postgres.SchemaChangeKey`. Please align the comment with
the implementation (or update the schema name if the intent is to avoid
clashing with Debezium’s own schema change records).
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java:
##########
@@ -311,6 +315,12 @@ public PostgresSourceBuilder<T>
includeDatabaseInTableId(boolean includeDatabase
return this;
}
+ /** Whether to infer schema change event on relation message. */
+ public PostgresSourceBuilder<T> includeSchemaChanges(boolean
includeSchemaChanges) {
+ this.configFactory.includeSchemaChanges(includeSchemaChanges);
+ return this;
+ }
+
/**
Review Comment:
This new `includeSchemaChanges(...)` builder method has misleading
semantics/documentation: it says it's about inferring schema change events from
Relation messages, but it actually delegates to
`configFactory.includeSchemaChanges(...)` (the generic "forward schema change
records" flag). This makes it easy to enable the wrong behavior and miss
enabling `PostgresSourceConfigFactory.enableSchemaChange(...)`. Please either
(a) rename/reword this method to match `includeSchemaChanges` semantics, or (b)
add a separate builder method that sets `enableSchemaChange(...)` and keep this
method for the existing include-schema-changes flag.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java:
##########
@@ -172,6 +174,7 @@ public DataSource createDataSource(Context context) {
.lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
.includeDatabaseInTableId(tableIdIncludeDatabase)
+ .includeSchemaChanges(includeSchemaChanges)
Review Comment:
`SCHEMA_CHANGE_ENABLED` is currently passed into
`.includeSchemaChanges(...)` on the builder. This is very likely the wrong
config knob: it flips whether raw Debezium schema change records are forwarded,
not whether pgoutput Relation messages are converted into inferred schema
change events. Consider calling the new `enableSchemaChange(...)` (or exposing
a dedicated builder method for it) and leave `.includeSchemaChanges(...)` for
the existing include-schema-changes behavior.
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/PostgresSchemaRecord.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.schema;
+
+import io.debezium.relational.Table;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/** A SourceRecord wrapper carrying a Debezium Table for schema change
propagation. */
+public class PostgresSchemaRecord extends SourceRecord {
+ // Use postgres-cdc rather than postgres to avoid conflict if debezium
support later.
+ private static final String SCHEMA_CHANGE_EVENT_KEY_NAME =
+ "io.debezium.connector.postgres.SchemaChangeKey";
+
+ private static final Schema KEY_SCHEMA =
+ SchemaBuilder.struct()
+ .name(SCHEMA_CHANGE_EVENT_KEY_NAME)
+ .field("table_id", Schema.STRING_SCHEMA)
+ .build();
+
+ private final Table table;
+
+ public PostgresSchemaRecord(Table table) {
+ super(null, null, null, KEY_SCHEMA, buildKey(table), null, null);
Review Comment:
`PostgresSchemaRecord` constructs a `SourceRecord` with `valueSchema ==
null` and `value == null`. If `includeSchemaChanges` is enabled,
`IncrementalSourceRecordEmitter.emitElement()` will call
`SourceRecordUtils.getMessageTimestamp(record)`, which dereferences
`record.valueSchema()` and will throw an NPE for this record type. Either
populate a minimal non-null value schema/value (so timestamp extraction safely
returns null), or ensure this record type is never passed to `emitElement`
(e.g., avoid matching `isSchemaChangeEvent` or gate emitting).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]