nickdelnano commented on code in PR #7315:
URL: https://github.com/apache/paimon/pull/7315#discussion_r2875378828
##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractDataFormat.java:
##########
@@ -55,9 +55,8 @@ public AbstractRecordParser createParser(
TypeMapping typeMapping,
List<ComputedColumn> computedColumns,
CdcMetadataConverter[] metadataConverters) {
- // Most parsers don't support metadata converters, so we default to
the 2-parameter version
- // Only specific parsers like DebeziumAvroRecordParser will override
this
- return createParser(typeMapping, computedColumns);
Review Comment:
@gmdfalk curious why not supported on other debezium formats except debezium
avro?
`testComputedColumn` was failing for other formats because:
- this test runs for many formats other than debezium avro
- it was waiting for kafka metadata fields to appear yet they weren't
configured
##########
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java:
##########
@@ -572,41 +555,16 @@ public void testMetadataColumn(String format) throws
Exception {
.build();
runActionWithDefaultEnv(action);
- FileStoreTable table = getFileStoreTable(tableName);
-
- // Verify the schema includes metadata columns
- RowType tableRowType = table.rowType();
- assertThat(tableRowType.getFieldNames())
- .containsExactlyInAnyOrder(
- "_id",
- "_date",
- "_year",
- "__kafka_topic",
- "__kafka_partition",
- "__kafka_offset",
- "__kafka_timestamp",
- "__kafka_timestamp_type");
-
- // Verify the data types of metadata columns
-
assertThat(tableRowType.getField("__kafka_topic").type()).isEqualTo(DataTypes.STRING());
-
assertThat(tableRowType.getField("__kafka_partition").type()).isEqualTo(DataTypes.INT());
-
assertThat(tableRowType.getField("__kafka_offset").type()).isEqualTo(DataTypes.BIGINT());
- assertThat(tableRowType.getField("__kafka_timestamp").type())
- .isEqualTo(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3));
- assertThat(tableRowType.getField("__kafka_timestamp_type").type())
- .isEqualTo(DataTypes.STRING());
Review Comment:
these assertions are redundant - `waitForResult` below validates column types
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]