This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ffc0c9734c44e96f1d490a8364b025a8621d0bfa Author: Fabian Paul <[email protected]> AuthorDate: Fri Oct 8 15:08:24 2021 +0200 [FLINK-24397][connectors/kafka] Remove TableSchema usage from Kafka table connector --- .../connectors/kafka/table/KafkaDynamicSource.java | 2 +- .../kafka/table/KafkaDynamicTableFactory.java | 28 ++++++----- .../table/UpsertKafkaDynamicTableFactory.java | 46 +++++++++++------- .../kafka/table/KafkaConnectorOptionsUtilTest.java | 54 +++++++--------------- 4 files changed, 63 insertions(+), 67 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java index ab0fa13..a391ec1 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java @@ -436,7 +436,7 @@ public class KafkaDynamicSource // adjust physical arity with value format's metadata final int adjustedPhysicalArity = - producedDataType.getChildren().size() - metadataKeys.size(); + DataType.getFieldDataTypes(producedDataType).size() - metadataKeys.size(); // adjust value format projection to include value format's metadata columns at the end final int[] adjustedValueProjection = diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java index cf23b3c..b2d5880 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java @@ -31,7 +31,6 @@ import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; @@ -162,7 +161,10 @@ public class KafkaDynamicTableFactory validateTableSourceOptions(tableOptions); validatePKConstraints( - context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat); + context.getObjectIdentifier(), + context.getPrimaryKeyIndexes(), + context.getCatalogTable().getOptions(), + valueDecodingFormat); final StartupOptions startupOptions = getStartupOptions(tableOptions); @@ -175,8 +177,7 @@ public class KafkaDynamicTableFactory KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), partitionDiscoveryInterval.orElse(-1L).toString()); - final DataType physicalDataType = - context.getCatalogTable().getSchema().toPhysicalRowDataType(); + final DataType physicalDataType = context.getPhysicalRowDataType(); final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); @@ -222,10 +223,12 @@ public class KafkaDynamicTableFactory KafkaConnectorOptionsUtil.validateDeliveryGuarantee(tableOptions); validatePKConstraints( - context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat); + context.getObjectIdentifier(), + context.getPrimaryKeyIndexes(), + context.getCatalogTable().getOptions(), + valueEncodingFormat); - final DataType physicalDataType = - context.getCatalogTable().getSchema().toPhysicalRowDataType(); + final DataType physicalDataType = context.getPhysicalRowDataType(); final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); @@ -310,12 +313,15 @@ public class KafkaDynamicTableFactory } private static void validatePKConstraints( - ObjectIdentifier tableName, CatalogTable catalogTable, Format format) { - if (catalogTable.getSchema().getPrimaryKey().isPresent() + ObjectIdentifier tableName, + int[] primaryKeyIndexes, + Map<String, String> options, + Format format) { + if (primaryKeyIndexes.length > 0 && format.getChangelogMode().containsOnly(RowKind.INSERT)) { - Configuration options = Configuration.fromMap(catalogTable.getOptions()); + Configuration configuration = Configuration.fromMap(options); String formatName = - options.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT)); + configuration.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT)); throw new ValidationException( String.format( "The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint" diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java index 6c0d88e..8682e40 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java @@ -26,9 +26,9 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; @@ -115,8 +115,11 @@ public class UpsertKafkaDynamicTableFactory // Validate the option data type. helper.validateExcept(PROPERTIES_PREFIX); - TableSchema schema = context.getCatalogTable().getSchema(); - validateSource(tableOptions, keyDecodingFormat, valueDecodingFormat, schema); + validateSource( + tableOptions, + keyDecodingFormat, + valueDecodingFormat, + context.getPrimaryKeyIndexes()); Tuple2<int[], int[]> keyValueProjections = createKeyValueProjections(context.getCatalogTable()); @@ -126,7 +129,7 @@ public class UpsertKafkaDynamicTableFactory StartupMode earliest = StartupMode.EARLIEST; return new KafkaDynamicSource( - schema.toPhysicalRowDataType(), + context.getPhysicalRowDataType(), keyDecodingFormat, new DecodingFormatWrapper(valueDecodingFormat), keyValueProjections.f0, @@ -157,8 +160,11 @@ public class UpsertKafkaDynamicTableFactory // Validate the option data type. helper.validateExcept(PROPERTIES_PREFIX); - TableSchema schema = context.getCatalogTable().getSchema(); - validateSink(tableOptions, keyEncodingFormat, valueEncodingFormat, schema); + validateSink( + tableOptions, + keyEncodingFormat, + valueEncodingFormat, + context.getPrimaryKeyIndexes()); Tuple2<int[], int[]> keyValueProjections = createKeyValueProjections(context.getCatalogTable()); @@ -175,8 +181,8 @@ public class UpsertKafkaDynamicTableFactory // use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}. // it will use hash partition if key is set else in round-robin behaviour. return new KafkaDynamicSink( - schema.toPhysicalRowDataType(), - schema.toPhysicalRowDataType(), + context.getPhysicalRowDataType(), + context.getPhysicalRowDataType(), keyEncodingFormat, new EncodingFormatWrapper(valueEncodingFormat), keyValueProjections.f0, @@ -192,8 +198,8 @@ public class UpsertKafkaDynamicTableFactory tableOptions.get(TRANSACTIONAL_ID_PREFIX)); } - private Tuple2<int[], int[]> createKeyValueProjections(CatalogTable catalogTable) { - TableSchema schema = catalogTable.getSchema(); + private Tuple2<int[], int[]> createKeyValueProjections(ResolvedCatalogTable catalogTable) { + ResolvedSchema schema = catalogTable.getResolvedSchema(); // primary key should validated earlier List<String> keyFields = schema.getPrimaryKey().get().getColumns(); DataType physicalDataType = schema.toPhysicalRowDataType(); @@ -213,17 +219,23 @@ public class UpsertKafkaDynamicTableFactory // -------------------------------------------------------------------------------------------- private static void validateSource( - ReadableConfig tableOptions, Format keyFormat, Format valueFormat, TableSchema schema) { + ReadableConfig tableOptions, + Format keyFormat, + Format valueFormat, + int[] primaryKeyIndexes) { validateTopic(tableOptions); validateFormat(keyFormat, valueFormat, tableOptions); - validatePKConstraints(schema); + validatePKConstraints(primaryKeyIndexes); } private static void validateSink( - ReadableConfig tableOptions, Format keyFormat, Format valueFormat, TableSchema schema) { + ReadableConfig tableOptions, + Format keyFormat, + Format valueFormat, + int[] primaryKeyIndexes) { validateTopic(tableOptions); validateFormat(keyFormat, valueFormat, tableOptions); - validatePKConstraints(schema); + validatePKConstraints(primaryKeyIndexes); validateSinkBufferFlush(tableOptions); } @@ -256,8 +268,8 @@ public class UpsertKafkaDynamicTableFactory } } - private static void validatePKConstraints(TableSchema schema) { - if (!schema.getPrimaryKey().isPresent()) { + private static void validatePKConstraints(int[] schema) { + if (schema.length == 0) { throw new ValidationException( "'upsert-kafka' tables require to define a PRIMARY KEY constraint. " + "The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. " diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtilTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtilTest.java index cb338a6..4ea4072 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtilTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtilTest.java @@ -20,8 +20,6 @@ package org.apache.flink.streaming.connectors.kafka.table; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.types.DataType; @@ -32,6 +30,10 @@ import java.util.Map; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection; +import static org.apache.flink.table.api.DataTypes.FIELD; +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.ROW; +import static org.apache.flink.table.api.DataTypes.STRING; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertThat; @@ -43,25 +45,18 @@ public class KafkaConnectorOptionsUtilTest { @Test public void testFormatProjection() { - final TableSchema schema = - TableSchema.builder() - .add(TableColumn.physical("id", DataTypes.INT())) - .add(TableColumn.metadata("timestamp", DataTypes.TIMESTAMP(3))) - .add( - TableColumn.computed( - "timestamp_converted", - DataTypes.STRING(), - "CAST(`timestamp` AS STRING)")) - .add(TableColumn.physical("name", DataTypes.STRING())) - .add(TableColumn.physical("age", DataTypes.INT())) - .add(TableColumn.physical("address", DataTypes.STRING())) - .build(); + final DataType dataType = + DataTypes.ROW( + FIELD("id", INT()), + FIELD("name", STRING()), + FIELD("age", INT()), + FIELD("address", STRING())); + final Map<String, String> options = createTestOptions(); options.put("key.fields", "address; name"); options.put("value.fields-include", "EXCEPT_KEY"); final Configuration config = Configuration.fromMap(options); - final DataType dataType = schema.toPhysicalRowDataType(); assertArrayEquals(new int[] {3, 1}, createKeyFormatProjection(config, dataType)); assertArrayEquals(new int[] {0, 2}, createValueFormatProjection(config, dataType)); @@ -69,12 +64,10 @@ public class KafkaConnectorOptionsUtilTest { @Test public void testMissingKeyFormatProjection() { - final TableSchema schema = - TableSchema.builder().add(TableColumn.physical("id", DataTypes.INT())).build(); + final DataType dataType = ROW(FIELD("id", INT())); final Map<String, String> options = createTestOptions(); final Configuration config = Configuration.fromMap(options); - final DataType dataType = schema.toPhysicalRowDataType(); try { createKeyFormatProjection(config, dataType); @@ -91,16 +84,11 @@ public class KafkaConnectorOptionsUtilTest { @Test public void testInvalidKeyFormatFieldProjection() { - final TableSchema schema = - TableSchema.builder() - .add(TableColumn.physical("id", DataTypes.INT())) - .add(TableColumn.physical("name", DataTypes.STRING())) - .build(); + final DataType dataType = ROW(FIELD("id", INT()), FIELD("name", STRING())); final Map<String, String> options = createTestOptions(); options.put("key.fields", "non_existing"); final Configuration config = Configuration.fromMap(options); - final DataType dataType = schema.toPhysicalRowDataType(); try { createKeyFormatProjection(config, dataType); @@ -120,18 +108,13 @@ public class KafkaConnectorOptionsUtilTest { @Test public void testInvalidKeyFormatPrefixProjection() { - final TableSchema schema = - TableSchema.builder() - .add(TableColumn.physical("k_part_1", DataTypes.INT())) - .add(TableColumn.physical("part_2", DataTypes.STRING())) - .add(TableColumn.physical("name", DataTypes.STRING())) - .build(); + final DataType dataType = + ROW(FIELD("k_part_1", INT()), FIELD("part_2", STRING()), FIELD("name", STRING())); final Map<String, String> options = createTestOptions(); options.put("key.fields", "k_part_1;part_2"); options.put("key.fields-prefix", "k_"); final Configuration config = Configuration.fromMap(options); - final DataType dataType = schema.toPhysicalRowDataType(); try { createKeyFormatProjection(config, dataType); @@ -148,17 +131,12 @@ public class KafkaConnectorOptionsUtilTest { @Test public void testInvalidValueFormatProjection() { - final TableSchema schema = - TableSchema.builder() - .add(TableColumn.physical("k_id", DataTypes.INT())) - .add(TableColumn.physical("id", DataTypes.STRING())) - .build(); + final DataType dataType = ROW(FIELD("k_id", INT()), FIELD("id", STRING())); final Map<String, String> options = createTestOptions(); options.put("key.fields", "k_id"); options.put("key.fields-prefix", "k_"); final Configuration config = Configuration.fromMap(options); - final DataType dataType = schema.toPhysicalRowDataType(); try { createValueFormatProjection(config, dataType);
