Repository: flink Updated Branches: refs/heads/release-1.6 702f77355 -> aa25b4b32
http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java deleted file mode 100644 index b976e14..0000000 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.descriptors.KafkaValidator; -import org.apache.flink.table.sources.RowtimeAttributeDescriptor; -import org.apache.flink.types.Row; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; - -/** - * Test for {@link Kafka09TableSource} created by {@link Kafka09TableSourceFactory}. - */ -public class Kafka09TableSourceFactoryTest extends KafkaTableSourceFactoryTestBase { - - @Override - protected String getKafkaVersion() { - return KafkaValidator.CONNECTOR_VERSION_VALUE_09; - } - - @Override - @SuppressWarnings("unchecked") - protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() { - return (Class) FlinkKafkaConsumer09.class; - } - - @Override - protected KafkaTableSource getExpectedKafkaTableSource( - TableSchema schema, - Optional<String> proctimeAttribute, - List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, - Map<String, String> fieldMapping, - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - StartupMode startupMode, - Map<KafkaTopicPartition, Long> specificStartupOffsets) { - - return new Kafka09TableSource( - schema, - proctimeAttribute, - rowtimeAttributeDescriptors, - Optional.of(fieldMapping), - topic, - properties, - deserializationSchema, - startupMode, - specificStartupOffsets - ); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java new file mode 100644 index 0000000..a6c8bd4 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.KafkaValidator; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Test for {@link Kafka09TableSource} and {@link Kafka09TableSink} created + * by {@link Kafka09TableSourceSinkFactory}. + */ +public class Kafka09TableSourceSinkFactoryTest extends KafkaTableSourceSinkFactoryTestBase { + + @Override + protected String getKafkaVersion() { + return KafkaValidator.CONNECTOR_VERSION_VALUE_09; + } + + @Override + @SuppressWarnings("unchecked") + protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() { + return (Class) FlinkKafkaConsumer09.class; + } + + @Override + protected Class<?> getExpectedFlinkKafkaProducer() { + return FlinkKafkaProducer09.class; + } + + @Override + protected KafkaTableSource getExpectedKafkaTableSource( + TableSchema schema, + Optional<String> proctimeAttribute, + List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, + Map<String, String> fieldMapping, + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets) { + + return new Kafka09TableSource( + schema, + proctimeAttribute, + rowtimeAttributeDescriptors, + Optional.of(fieldMapping), + topic, + properties, + deserializationSchema, + startupMode, + specificStartupOffsets + ); + } + + @Override + protected KafkaTableSink getExpectedKafkaTableSink( + TableSchema schema, + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner, + SerializationSchema<Row> serializationSchema) { + + return new Kafka09TableSink( + schema, + topic, + properties, + partitioner, + serializationSchema + ); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java index ec27398..231eddd 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java @@ -29,7 +29,10 @@ import java.util.Properties; /** * Base class for {@link KafkaTableSink} that serializes data in JSON format. + * + * @deprecated Use table descriptors instead of implementation-specific classes. */ +@Deprecated @Internal public abstract class KafkaJsonTableSink extends KafkaTableSink { @@ -39,7 +42,9 @@ public abstract class KafkaJsonTableSink extends KafkaTableSink { * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka * @param partitioner Kafka partitioner + * @deprecated Use table descriptors instead of implementation-specific classes. */ + @Deprecated public KafkaJsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) { super(topic, properties, partitioner); } http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index 687df58..7853bb7 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -23,12 +23,17 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.table.util.TableConnectorUtil; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; import java.util.Properties; /** @@ -40,27 +45,59 @@ import java.util.Properties; @Internal public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { + // TODO make all attributes final and mandatory once we drop support for format-specific table sinks + + /** The schema of the table. */ + private final Optional<TableSchema> schema; + + /** The Kafka topic to write to. */ protected final String topic; + + /** Properties for the Kafka producer. */ protected final Properties properties; - protected SerializationSchema<Row> serializationSchema; + + /** Serialization schema for encoding records to Kafka. */ + protected Optional<SerializationSchema<Row>> serializationSchema; + + /** Partitioner to select Kafka partition for each item. */ protected final FlinkKafkaPartitioner<Row> partitioner; + + // legacy variables protected String[] fieldNames; protected TypeInformation[] fieldTypes; + protected KafkaTableSink( + TableSchema schema, + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner, + SerializationSchema<Row> serializationSchema) { + this.schema = Optional.of(Preconditions.checkNotNull(schema, "Schema must not be null.")); + this.topic = Preconditions.checkNotNull(topic, "Topic must not be null."); + this.properties = Preconditions.checkNotNull(properties, "Properties must not be null."); + this.partitioner = Preconditions.checkNotNull(partitioner, "Partitioner must not be null."); + this.serializationSchema = Optional.of(Preconditions.checkNotNull( + serializationSchema, "Serialization schema must not be null.")); + } + /** * Creates KafkaTableSink. * * @param topic Kafka topic to write to. - * @param properties Properties for the Kafka consumer. + * @param properties Properties for the Kafka producer. * @param partitioner Partitioner to select Kafka partition for each item + * @deprecated Use table descriptors instead of implementation-specific classes. */ + @Deprecated public KafkaTableSink( String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) { + this.schema = Optional.empty(); this.topic = Preconditions.checkNotNull(topic, "topic"); this.properties = Preconditions.checkNotNull(properties, "properties"); this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner"); + this.serializationSchema = Optional.empty(); } /** @@ -72,8 +109,9 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { * @param partitioner Partitioner to select Kafka partition. * @return The version-specific Kafka producer */ - protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer( - String topic, Properties properties, + protected abstract SinkFunction<Row> createKafkaProducer( + String topic, + Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner); @@ -82,40 +120,57 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { * * @param rowSchema the schema of the row to serialize. * @return Instance of serialization schema + * @deprecated Use the constructor to pass a serialization schema instead. */ - protected abstract SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema); + @Deprecated + protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema) { + throw new UnsupportedOperationException("This method only exists for backwards compatibility."); + } /** * Create a deep copy of this sink. * * @return Deep copy of this sink */ - protected abstract KafkaTableSink createCopy(); + @Deprecated + protected KafkaTableSink createCopy() { + throw new UnsupportedOperationException("This method only exists for backwards compatibility."); + } @Override public void emitDataStream(DataStream<Row> dataStream) { - FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner); - // always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled. - kafkaProducer.setFlushOnCheckpoint(true); + SinkFunction<Row> kafkaProducer = createKafkaProducer( + topic, + properties, + serializationSchema.orElseThrow(() -> new IllegalStateException("No serialization schema defined.")), + partitioner); dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames)); } @Override public TypeInformation<Row> getOutputType() { - return new RowTypeInfo(getFieldTypes()); + return schema + .map(TableSchema::toRowType) + .orElseGet(() -> new RowTypeInfo(getFieldTypes())); } public String[] getFieldNames() { - return fieldNames; + return schema.map(TableSchema::getColumnNames).orElse(fieldNames); } @Override public TypeInformation<?>[] getFieldTypes() { - return fieldTypes; + return schema.map(TableSchema::getTypes).orElse(fieldTypes); } @Override public KafkaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + if (schema.isPresent()) { + // a fixed schema is defined so reconfiguration is not supported + throw new UnsupportedOperationException("Reconfiguration of this sink is not supported."); + } + + // legacy code KafkaTableSink copy = createCopy(); copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); @@ -123,8 +178,39 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { "Number of provided field names and types does not match."); RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames); - copy.serializationSchema = createSerializationSchema(rowSchema); + copy.serializationSchema = Optional.of(createSerializationSchema(rowSchema)); return copy; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KafkaTableSink that = (KafkaTableSink) o; + return Objects.equals(schema, that.schema) && + Objects.equals(topic, that.topic) && + Objects.equals(properties, that.properties) && + Objects.equals(serializationSchema, that.serializationSchema) && + Objects.equals(partitioner, that.partitioner) && + Arrays.equals(fieldNames, that.fieldNames) && + Arrays.equals(fieldTypes, that.fieldTypes); + } + + @Override + public int hashCode() { + int result = Objects.hash( + schema, + topic, + properties, + serializationSchema, + partitioner); + result = 31 * result + Arrays.hashCode(fieldNames); + result = 31 * result + Arrays.hashCode(fieldTypes); + return result; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java deleted file mode 100644 index d7e42f5..0000000 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.KafkaValidator; -import org.apache.flink.table.descriptors.SchemaValidator; -import org.apache.flink.table.factories.DeserializationSchemaFactory; -import org.apache.flink.table.factories.StreamTableSourceFactory; -import org.apache.flink.table.factories.TableFactoryService; -import org.apache.flink.table.sources.RowtimeAttributeDescriptor; -import org.apache.flink.table.sources.StreamTableSource; -import org.apache.flink.types.Row; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; - -import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; -import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; -import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION; -import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT; -import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES; -import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY; -import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE; -import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS; -import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET; -import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION; -import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE; -import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC; -import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA; -import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_CLASS; -import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM; -import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_SERIALIZED; -import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE; -import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_CLASS; -import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_DELAY; -import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_SERIALIZED; -import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE; -import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA; -import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM; -import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME; -import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME; -import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE; -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE; -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND; - -/** - * Factory for creating configured instances of {@link KafkaTableSource}. - */ -public abstract class KafkaTableSourceFactory implements StreamTableSourceFactory<Row> { - - @Override - public Map<String, String> requiredContext() { - Map<String, String> context = new HashMap<>(); - context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); // append mode - context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // kafka - context.put(CONNECTOR_VERSION(), kafkaVersion()); // version - context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards compatibility - return context; - } - - @Override - public List<String> supportedProperties() { - List<String> properties = new ArrayList<>(); - - // kafka - properties.add(CONNECTOR_TOPIC); - properties.add(CONNECTOR_PROPERTIES); - properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_KEY); - properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_VALUE); - properties.add(CONNECTOR_STARTUP_MODE); - properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_PARTITION); - properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_OFFSET); - - // schema - properties.add(SCHEMA() + ".#." + SCHEMA_TYPE()); - properties.add(SCHEMA() + ".#." + SCHEMA_NAME()); - properties.add(SCHEMA() + ".#." + SCHEMA_FROM()); - - // time attributes - properties.add(SCHEMA() + ".#." + SCHEMA_PROCTIME()); - properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE()); - properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM()); - properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_CLASS()); - properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED()); - properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE()); - properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_CLASS()); - properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_SERIALIZED()); - properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY()); - - // format wildcard - properties.add(FORMAT() + ".*"); - - return properties; - } - - @Override - public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) { - final DescriptorProperties params = new DescriptorProperties(true); - params.putProperties(properties); - - // validate - // allow Kafka timestamps to be used, watermarks can not be received from source - new SchemaValidator(true, supportsKafkaTimestamps(), false).validate(params); - new KafkaValidator().validate(params); - - // deserialization schema using format discovery - final DeserializationSchemaFactory<?> formatFactory = TableFactoryService.find( - DeserializationSchemaFactory.class, - properties, - this.getClass().getClassLoader()); - @SuppressWarnings("unchecked") - final DeserializationSchema<Row> deserializationSchema = (DeserializationSchema<Row>) formatFactory - .createDeserializationSchema(properties); - - // schema - final TableSchema schema = params.getTableSchema(SCHEMA()); - - // proctime - final Optional<String> proctimeAttribute = SchemaValidator.deriveProctimeAttribute(params); - - // rowtime - final List<RowtimeAttributeDescriptor> rowtimeAttributes = SchemaValidator.deriveRowtimeAttributes(params); - - // field mapping - final Map<String, String> fieldMapping = SchemaValidator.deriveFieldMapping(params, Optional.of(schema)); - - // properties - final Properties kafkaProperties = new Properties(); - final List<Map<String, String>> propsList = params.getFixedIndexedProperties( - CONNECTOR_PROPERTIES, - Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE)); - propsList.forEach(kv -> kafkaProperties.put( - params.getString(kv.get(CONNECTOR_PROPERTIES_KEY)), - params.getString(kv.get(CONNECTOR_PROPERTIES_VALUE)) - )); - - // topic - final String topic = params.getString(CONNECTOR_TOPIC); - - // startup mode - final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); - final StartupMode startupMode = params - .getOptionalString(CONNECTOR_STARTUP_MODE) - .map(modeString -> { - switch (modeString) { - case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST: - return StartupMode.EARLIEST; - - case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_LATEST: - return StartupMode.LATEST; - - case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS: - return StartupMode.GROUP_OFFSETS; - - case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS: - final List<Map<String, String>> offsetList = params.getFixedIndexedProperties( - CONNECTOR_SPECIFIC_OFFSETS, - Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET)); - offsetList.forEach(kv -> { - final int partition = params.getInt(kv.get(CONNECTOR_SPECIFIC_OFFSETS_PARTITION)); - final long offset = params.getLong(kv.get(CONNECTOR_SPECIFIC_OFFSETS_OFFSET)); - final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition); - specificOffsets.put(topicPartition, offset); - }); - return StartupMode.SPECIFIC_OFFSETS; - default: - throw new TableException("Unsupported startup mode. Validator should have checked that."); - } - }).orElse(StartupMode.GROUP_OFFSETS); - - return createKafkaTableSource( - schema, - proctimeAttribute, - rowtimeAttributes, - fieldMapping, - topic, - kafkaProperties, - deserializationSchema, - startupMode, - specificOffsets); - } - - // -------------------------------------------------------------------------------------------- - // For version-specific factories - // -------------------------------------------------------------------------------------------- - - /** - * Returns the Kafka version. - */ - protected abstract String kafkaVersion(); - - /** - * True if the Kafka source supports Kafka timestamps, false otherwise. - * - * @return True if the Kafka source supports Kafka timestamps, false otherwise. - */ - protected abstract boolean supportsKafkaTimestamps(); - - /** - * Constructs the version-specific Kafka table source. - * - * @param schema Schema of the produced table. - * @param proctimeAttribute Field name of the processing time attribute. - * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute - * @param fieldMapping Mapping for the fields of the table schema to - * fields of the physical returned type. - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema for decoding records from Kafka. - * @param startupMode Startup mode for the contained consumer. - * @param specificStartupOffsets Specific startup offsets; only relevant when startup - * mode is {@link StartupMode#SPECIFIC_OFFSETS}. - */ - protected abstract KafkaTableSource createKafkaTableSource( - TableSchema schema, - Optional<String> proctimeAttribute, - List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, - Map<String, String> fieldMapping, - String topic, Properties properties, - DeserializationSchema<Row> deserializationSchema, - StartupMode startupMode, - Map<KafkaTopicPartition, Long> specificStartupOffsets); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java new file mode 100644 index 0000000..3307994 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.KafkaValidator; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.DeserializationSchemaFactory; +import org.apache.flink.table.factories.SerializationSchemaFactory; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.factories.TableFactoryService; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION; +import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_CLASS; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_SERIALIZED; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_CLASS; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_DELAY; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_SERIALIZED; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE; +import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA; +import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM; +import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME; +import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE; +import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE; +import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND; + +/** + * Factory for creating configured instances of {@link KafkaTableSource}. + */ +public abstract class KafkaTableSourceSinkFactoryBase implements + StreamTableSourceFactory<Row>, + StreamTableSinkFactory<Row> { + + @Override + public Map<String, String> requiredContext() { + Map<String, String> context = new HashMap<>(); + context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); // append mode + context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // kafka + context.put(CONNECTOR_VERSION(), kafkaVersion()); // version + context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards compatibility + return context; + } + + @Override + public List<String> supportedProperties() { + List<String> properties = new ArrayList<>(); + + // kafka + properties.add(CONNECTOR_TOPIC); + properties.add(CONNECTOR_PROPERTIES); + properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_KEY); + properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_VALUE); + properties.add(CONNECTOR_STARTUP_MODE); + properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_PARTITION); + properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_OFFSET); + + // schema + properties.add(SCHEMA() + ".#." + SCHEMA_TYPE()); + properties.add(SCHEMA() + ".#." + SCHEMA_NAME()); + properties.add(SCHEMA() + ".#." + SCHEMA_FROM()); + + // time attributes + properties.add(SCHEMA() + ".#." + SCHEMA_PROCTIME()); + properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE()); + properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM()); + properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_CLASS()); + properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED()); + properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE()); + properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_CLASS()); + properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_SERIALIZED()); + properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY()); + + // format wildcard + properties.add(FORMAT() + ".*"); + + return properties; + } + + @Override + public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) { + final DescriptorProperties descriptorProperties = getValidatedProperties(properties); + + final TableSchema schema = descriptorProperties.getTableSchema(SCHEMA()); + final String topic = descriptorProperties.getString(CONNECTOR_TOPIC); + final StartupOptions startupOptions = getStartupOptions(descriptorProperties, topic); + + return createKafkaTableSource( + schema, + SchemaValidator.deriveProctimeAttribute(descriptorProperties), + SchemaValidator.deriveRowtimeAttributes(descriptorProperties), + SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema)), + topic, + getKafkaProperties(descriptorProperties), + getDeserializationSchema(properties), + startupOptions.startupMode, + startupOptions.specificOffsets); + } + + @Override + public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) { + final DescriptorProperties descriptorProperties = getValidatedProperties(properties); + + final TableSchema schema = descriptorProperties.getTableSchema(SCHEMA()); + final String topic = descriptorProperties.getString(CONNECTOR_TOPIC); + final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(descriptorProperties); + final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors = + SchemaValidator.deriveRowtimeAttributes(descriptorProperties); + + // see also FLINK-9870 + if (proctime.isPresent() || !rowtimeAttributeDescriptors.isEmpty() || + checkForCustomFieldMapping(descriptorProperties, schema)) { + throw new TableException("Time attributes and custom field mappings are not supported yet."); + } + + return createKafkaTableSink( + schema, + topic, + getKafkaProperties(descriptorProperties), + getFlinkKafkaPartitioner(), + getSerializationSchema(properties)); + } + + // -------------------------------------------------------------------------------------------- + // For version-specific factories + // -------------------------------------------------------------------------------------------- + + /** + * Returns the Kafka version. + */ + protected abstract String kafkaVersion(); + + /** + * True if the Kafka source supports Kafka timestamps, false otherwise. + * + * @return True if the Kafka source supports Kafka timestamps, false otherwise. + */ + protected abstract boolean supportsKafkaTimestamps(); + + /** + * Constructs the version-specific Kafka table source. + * + * @param schema Schema of the produced table. + * @param proctimeAttribute Field name of the processing time attribute. + * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute + * @param fieldMapping Mapping for the fields of the table schema to + * fields of the physical returned type. + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema for decoding records from Kafka. + * @param startupMode Startup mode for the contained consumer. + * @param specificStartupOffsets Specific startup offsets; only relevant when startup + * mode is {@link StartupMode#SPECIFIC_OFFSETS}. + */ + protected abstract KafkaTableSource createKafkaTableSource( + TableSchema schema, + Optional<String> proctimeAttribute, + List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, + Map<String, String> fieldMapping, + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets); + + /** + * Constructs the version-specific Kafka table sink. + * + * @param schema Schema of the produced table. + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param partitioner Partitioner to select Kafka partition for each item. + */ + protected abstract KafkaTableSink createKafkaTableSink( + TableSchema schema, + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner, + SerializationSchema<Row> serializationSchema); + + // -------------------------------------------------------------------------------------------- + // Helper methods + // -------------------------------------------------------------------------------------------- + + private DescriptorProperties getValidatedProperties(Map<String, String> properties) { + final DescriptorProperties descriptorProperties = new DescriptorProperties(true); + descriptorProperties.putProperties(properties); + + // allow Kafka timestamps to be used, watermarks can not be received from source + new SchemaValidator(true, supportsKafkaTimestamps(), false).validate(descriptorProperties); + new KafkaValidator().validate(descriptorProperties); + + return descriptorProperties; + } + + private DeserializationSchema<Row> getDeserializationSchema(Map<String, String> properties) { + @SuppressWarnings("unchecked") + final DeserializationSchemaFactory<Row> formatFactory = TableFactoryService.find( + DeserializationSchemaFactory.class, + properties, + this.getClass().getClassLoader()); + return formatFactory.createDeserializationSchema(properties); + } + + private SerializationSchema<Row> getSerializationSchema(Map<String, String> properties) { + @SuppressWarnings("unchecked") + final SerializationSchemaFactory<Row> formatFactory = TableFactoryService.find( + SerializationSchemaFactory.class, + properties, + this.getClass().getClassLoader()); + return formatFactory.createSerializationSchema(properties); + } + + private Properties getKafkaProperties(DescriptorProperties descriptorProperties) { + final Properties kafkaProperties = new Properties(); + final List<Map<String, String>> propsList = descriptorProperties.getFixedIndexedProperties( + CONNECTOR_PROPERTIES, + Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE)); + propsList.forEach(kv -> kafkaProperties.put( + descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_KEY)), + descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_VALUE)) + )); + return kafkaProperties; + } + + private StartupOptions getStartupOptions( + DescriptorProperties descriptorProperties, + String topic) { + final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); + final StartupMode startupMode = descriptorProperties + .getOptionalString(CONNECTOR_STARTUP_MODE) + .map(modeString -> { + switch (modeString) { + case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST: + return StartupMode.EARLIEST; + + case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_LATEST: + return StartupMode.LATEST; + + case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS: + return StartupMode.GROUP_OFFSETS; + + case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS: + final List<Map<String, String>> offsetList = descriptorProperties.getFixedIndexedProperties( + CONNECTOR_SPECIFIC_OFFSETS, + Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET)); + offsetList.forEach(kv -> { + final int partition = descriptorProperties.getInt(kv.get(CONNECTOR_SPECIFIC_OFFSETS_PARTITION)); + final long offset = descriptorProperties.getLong(kv.get(CONNECTOR_SPECIFIC_OFFSETS_OFFSET)); + final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition); + specificOffsets.put(topicPartition, offset); + }); + return StartupMode.SPECIFIC_OFFSETS; + default: + throw new TableException("Unsupported startup mode. Validator should have checked that."); + } + }).orElse(StartupMode.GROUP_OFFSETS); + final StartupOptions options = new StartupOptions(); + options.startupMode = startupMode; + options.specificOffsets = specificOffsets; + return options; + } + + private FlinkKafkaPartitioner<Row> getFlinkKafkaPartitioner() { + // we don't support custom partitioner so far + return new FlinkFixedPartitioner<>(); + } + + private boolean checkForCustomFieldMapping(DescriptorProperties descriptorProperties, TableSchema schema) { + final Map<String, String> fieldMapping = SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema)); + return fieldMapping.size() != schema.getColumnNames().length || + !fieldMapping.entrySet().stream().allMatch(mapping -> mapping.getKey().equals(mapping.getValue())); + } + + private static class StartupOptions { + private StartupMode startupMode; + private Map<KafkaTopicPartition, Long> specificOffsets; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java index 6e83ddd..7e0d1fb 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java @@ -74,4 +74,14 @@ public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> { return partitions[parallelInstanceId % partitions.length]; } + + @Override + public boolean equals(Object o) { + return this == o || o instanceof FlinkFixedPartitioner; + } + + @Override + public int hashCode() { + return FlinkFixedPartitioner.class.hashCode(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java index a87c622..946b6eb5 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -44,7 +44,11 @@ import static org.mockito.Mockito.when; /** * Abstract test base for all Kafka table sink tests. + * + * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we + * drop support for format-specific table sinks. */ +@Deprecated public abstract class KafkaTableSinkTestBase { private static final String TOPIC = "testTopic"; @@ -94,7 +98,8 @@ public abstract class KafkaTableSinkTestBase { protected abstract Class<? extends FlinkKafkaProducerBase> getProducerClass(); private KafkaTableSink createTableSink() { - return createTableSink(TOPIC, PROPERTIES, PARTITIONER); + KafkaTableSink sink = createTableSink(TOPIC, PROPERTIES, PARTITIONER); + return sink.configure(FIELD_NAMES, FIELD_TYPES); } private static Properties createSinkProperties() { http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java deleted file mode 100644 index 96f1607..0000000 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.Types; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.Kafka; -import org.apache.flink.table.descriptors.Rowtime; -import org.apache.flink.table.descriptors.Schema; -import org.apache.flink.table.descriptors.TestTableDescriptor; -import org.apache.flink.table.factories.StreamTableSourceFactory; -import org.apache.flink.table.factories.TableFactoryService; -import org.apache.flink.table.factories.utils.TestDeserializationSchema; -import org.apache.flink.table.factories.utils.TestTableFormat; -import org.apache.flink.table.sources.RowtimeAttributeDescriptor; -import org.apache.flink.table.sources.TableSource; -import org.apache.flink.table.sources.tsextractors.ExistingField; -import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps; -import org.apache.flink.types.Row; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Abstract test base for {@link KafkaTableSourceFactory}. - */ -public abstract class KafkaTableSourceFactoryTestBase extends TestLogger { - - private static final String TOPIC = "myTopic"; - private static final int PARTITION_0 = 0; - private static final long OFFSET_0 = 100L; - private static final int PARTITION_1 = 1; - private static final long OFFSET_1 = 123L; - private static final String FRUIT_NAME = "fruit-name"; - private static final String NAME = "name"; - private static final String COUNT = "count"; - private static final String TIME = "time"; - private static final String EVENT_TIME = "event-time"; - private static final String PROC_TIME = "proc-time"; - private static final Properties KAFKA_PROPERTIES = new Properties(); - static { - KAFKA_PROPERTIES.setProperty("zookeeper.connect", "dummy"); - KAFKA_PROPERTIES.setProperty("group.id", "dummy"); - } - - @Test - @SuppressWarnings("unchecked") - public void testTableSource() { - - // prepare parameters for Kafka table source - - final TableSchema schema = TableSchema.builder() - .field(FRUIT_NAME, Types.STRING()) - .field(COUNT, Types.DECIMAL()) - .field(EVENT_TIME, Types.SQL_TIMESTAMP()) - .field(PROC_TIME, Types.SQL_TIMESTAMP()) - .build(); - - final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors = Collections.singletonList( - new RowtimeAttributeDescriptor(EVENT_TIME, new ExistingField(TIME), new AscendingTimestamps())); - - final Map<String, String> fieldMapping = new HashMap<>(); - fieldMapping.put(FRUIT_NAME, NAME); - fieldMapping.put(COUNT, COUNT); - - final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); - specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); - specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1); - - final TestDeserializationSchema deserializationSchema = new TestDeserializationSchema( - TableSchema.builder() - .field(NAME, Types.STRING()) - .field(COUNT, Types.DECIMAL()) - .field(TIME, Types.SQL_TIMESTAMP()) - .build() - .toRowType() - ); - - final StartupMode startupMode = StartupMode.SPECIFIC_OFFSETS; - - final KafkaTableSource expected = getExpectedKafkaTableSource( - schema, - Optional.of(PROC_TIME), - rowtimeAttributeDescriptors, - fieldMapping, - TOPIC, - KAFKA_PROPERTIES, - deserializationSchema, - startupMode, - specificOffsets); - - // construct table source using descriptors and table source factory - - final Map<Integer, Long> offsets = new HashMap<>(); - offsets.put(PARTITION_0, OFFSET_0); - offsets.put(PARTITION_1, OFFSET_1); - - final TestTableDescriptor testDesc = new TestTableDescriptor( - new Kafka() - .version(getKafkaVersion()) - .topic(TOPIC) - .properties(KAFKA_PROPERTIES) - .startFromSpecificOffsets(offsets)) - .withFormat(new TestTableFormat()) - .withSchema( - new Schema() - .field(FRUIT_NAME, Types.STRING()).from(NAME) - .field(COUNT, Types.DECIMAL()) // no from so it must match with the input - .field(EVENT_TIME, Types.SQL_TIMESTAMP()).rowtime( - new Rowtime().timestampsFromField(TIME).watermarksPeriodicAscending()) - .field(PROC_TIME, Types.SQL_TIMESTAMP()).proctime()) - .inAppendMode(); - final DescriptorProperties descriptorProperties = new DescriptorProperties(true); - testDesc.addProperties(descriptorProperties); - final Map<String, String> propertiesMap = descriptorProperties.asMap(); - - final TableSource<?> actualSource = TableFactoryService.find(StreamTableSourceFactory.class, testDesc) - .createStreamTableSource(propertiesMap); - - assertEquals(expected, actualSource); - - // test Kafka consumer - final KafkaTableSource actualKafkaSource = (KafkaTableSource) actualSource; - final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock(); - actualKafkaSource.getDataStream(mock); - assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.function.getClass())); - } - - private static class StreamExecutionEnvironmentMock extends StreamExecutionEnvironment { - - public SourceFunction<?> function; - - @Override - public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) { - this.function = function; - return super.addSource(function); - } - - @Override - public JobExecutionResult execute(String jobName) { - throw new UnsupportedOperationException(); - } - } - - // -------------------------------------------------------------------------------------------- - // For version-specific tests - // -------------------------------------------------------------------------------------------- - - protected abstract String getKafkaVersion(); - - protected abstract Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer(); - - protected abstract KafkaTableSource getExpectedKafkaTableSource( - TableSchema schema, - Optional<String> proctimeAttribute, - List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, - Map<String, String> fieldMapping, - String topic, Properties properties, - DeserializationSchema<Row> deserializationSchema, - StartupMode startupMode, - Map<KafkaTopicPartition, Long> specificStartupOffsets); -} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java new file mode 100644 index 0000000..d8e8f7d --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.Types; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.Kafka; +import org.apache.flink.table.descriptors.Rowtime; +import org.apache.flink.table.descriptors.Schema; +import org.apache.flink.table.descriptors.TestTableDescriptor; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.factories.TableFactoryService; +import org.apache.flink.table.factories.utils.TestDeserializationSchema; +import org.apache.flink.table.factories.utils.TestSerializationSchema; +import org.apache.flink.table.factories.utils.TestTableFormat; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.tsextractors.ExistingField; +import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps; +import org.apache.flink.types.Row; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Abstract test base for {@link KafkaTableSourceSinkFactoryBase}. + */ +public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { + + private static final String TOPIC = "myTopic"; + private static final int PARTITION_0 = 0; + private static final long OFFSET_0 = 100L; + private static final int PARTITION_1 = 1; + private static final long OFFSET_1 = 123L; + private static final String FRUIT_NAME = "fruit-name"; + private static final String NAME = "name"; + private static final String COUNT = "count"; + private static final String TIME = "time"; + private static final String EVENT_TIME = "event-time"; + private static final String PROC_TIME = "proc-time"; + private static final Properties KAFKA_PROPERTIES = new Properties(); + static { + KAFKA_PROPERTIES.setProperty("zookeeper.connect", "dummy"); + KAFKA_PROPERTIES.setProperty("group.id", "dummy"); + KAFKA_PROPERTIES.setProperty("bootstrap.servers", "dummy"); + } + + private static final Map<Integer, Long> OFFSETS = new HashMap<>(); + static { + OFFSETS.put(PARTITION_0, OFFSET_0); + OFFSETS.put(PARTITION_1, OFFSET_1); + } + + @Test + @SuppressWarnings("unchecked") + public void testTableSource() { + + // prepare parameters for Kafka table source + + final TableSchema schema = TableSchema.builder() + .field(FRUIT_NAME, Types.STRING()) + .field(COUNT, Types.DECIMAL()) + .field(EVENT_TIME, Types.SQL_TIMESTAMP()) + .field(PROC_TIME, Types.SQL_TIMESTAMP()) + .build(); + + final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors = Collections.singletonList( + new RowtimeAttributeDescriptor(EVENT_TIME, new ExistingField(TIME), new AscendingTimestamps())); + + final Map<String, String> fieldMapping = new HashMap<>(); + fieldMapping.put(FRUIT_NAME, NAME); + fieldMapping.put(COUNT, COUNT); + + final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1); + + final TestDeserializationSchema deserializationSchema = new TestDeserializationSchema( + TableSchema.builder() + .field(NAME, Types.STRING()) + .field(COUNT, Types.DECIMAL()) + .field(TIME, Types.SQL_TIMESTAMP()) + .build() + .toRowType() + ); + + final KafkaTableSource expected = getExpectedKafkaTableSource( + schema, + Optional.of(PROC_TIME), + rowtimeAttributeDescriptors, + fieldMapping, + TOPIC, + KAFKA_PROPERTIES, + deserializationSchema, + StartupMode.SPECIFIC_OFFSETS, + specificOffsets); + + // construct table source using descriptors and table source factory + + final TestTableDescriptor testDesc = new TestTableDescriptor( + new Kafka() + .version(getKafkaVersion()) + .topic(TOPIC) + .properties(KAFKA_PROPERTIES) + .startFromSpecificOffsets(OFFSETS)) + .withFormat(new TestTableFormat()) + .withSchema( + new Schema() + .field(FRUIT_NAME, Types.STRING()).from(NAME) + .field(COUNT, Types.DECIMAL()) // no from so it must match with the input + .field(EVENT_TIME, Types.SQL_TIMESTAMP()).rowtime( + new Rowtime().timestampsFromField(TIME).watermarksPeriodicAscending()) + .field(PROC_TIME, Types.SQL_TIMESTAMP()).proctime()) + .inAppendMode(); + + final Map<String, String> propertiesMap = DescriptorProperties.toJavaMap(testDesc); + final TableSource<?> actualSource = TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap) + .createStreamTableSource(propertiesMap); + + assertEquals(expected, actualSource); + + // test Kafka consumer + final KafkaTableSource actualKafkaSource = (KafkaTableSource) actualSource; + final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock(); + actualKafkaSource.getDataStream(mock); + assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass())); + } + + /** + * This test can be unified with the corresponding source test once we have fixed FLINK-9870. + */ + @Test + public void testTableSink() { + // prepare parameters for Kafka table sink + + final TableSchema schema = TableSchema.builder() + .field(FRUIT_NAME, Types.STRING()) + .field(COUNT, Types.DECIMAL()) + .field(EVENT_TIME, Types.SQL_TIMESTAMP()) + .build(); + + final KafkaTableSink expected = getExpectedKafkaTableSink( + schema, + TOPIC, + KAFKA_PROPERTIES, + new FlinkFixedPartitioner<>(), // a custom partitioner is not support yet + new TestSerializationSchema(schema.toRowType())); + + // construct table sink using descriptors and table sink factory + + final TestTableDescriptor testDesc = new TestTableDescriptor( + new Kafka() + .version(getKafkaVersion()) + .topic(TOPIC) + .properties(KAFKA_PROPERTIES) + .startFromSpecificOffsets(OFFSETS)) // test if they accepted although not needed + .withFormat(new TestTableFormat()) + .withSchema( + new Schema() + .field(FRUIT_NAME, Types.STRING()) + .field(COUNT, Types.DECIMAL()) + .field(EVENT_TIME, Types.SQL_TIMESTAMP())) + .inAppendMode(); + + final Map<String, String> propertiesMap = DescriptorProperties.toJavaMap(testDesc); + final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap) + .createStreamTableSink(propertiesMap); + + assertEquals(expected, actualSink); + + // test Kafka producer + final KafkaTableSink actualKafkaSink = (KafkaTableSink) actualSink; + final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType()); + actualKafkaSink.emitDataStream(streamMock); + assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass())); + } + + private static class StreamExecutionEnvironmentMock extends StreamExecutionEnvironment { + + public SourceFunction<?> sourceFunction; + + @Override + public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> sourceFunction) { + this.sourceFunction = sourceFunction; + return super.addSource(sourceFunction); + } + + @Override + public JobExecutionResult execute(String jobName) { + throw new UnsupportedOperationException(); + } + } + + private static class DataStreamMock extends DataStream<Row> { + + public SinkFunction<?> sinkFunction; + + public DataStreamMock(StreamExecutionEnvironment environment, TypeInformation<Row> outType) { + super(environment, new StreamTransformationMock("name", outType, 1)); + } + + @Override + public DataStreamSink<Row> addSink(SinkFunction<Row> sinkFunction) { + this.sinkFunction = sinkFunction; + return super.addSink(sinkFunction); + } + } + + private static class StreamTransformationMock extends StreamTransformation<Row> { + + public StreamTransformationMock(String name, TypeInformation<Row> outputType, int parallelism) { + super(name, outputType, parallelism); + } + + @Override + public void setChainingStrategy(ChainingStrategy strategy) { + // do nothing + } + + @Override + public Collection<StreamTransformation<?>> getTransitivePredecessors() { + return null; + } + } + + // -------------------------------------------------------------------------------------------- + // For version-specific tests + // -------------------------------------------------------------------------------------------- + + protected abstract String getKafkaVersion(); + + protected abstract Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer(); + + protected abstract Class<?> getExpectedFlinkKafkaProducer(); + + protected abstract KafkaTableSource getExpectedKafkaTableSource( + TableSchema schema, + Optional<String> proctimeAttribute, + List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, + Map<String, String> fieldMapping, + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets); + + protected abstract KafkaTableSink getExpectedKafkaTableSink( + TableSchema schema, + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner, + SerializationSchema<Row> serializationSchema); +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala index ab613a9..a7eaa48 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala @@ -19,12 +19,26 @@ package org.apache.flink.table.factories.utils import org.apache.flink.api.common.serialization.SerializationSchema +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.types.Row /** * Serialization schema for testing purposes. */ -class TestSerializationSchema extends SerializationSchema[Row] { +class TestSerializationSchema(val typeInfo: TypeInformation[Row]) extends SerializationSchema[Row] { override def serialize(element: Row): Array[Byte] = throw new UnsupportedOperationException() + + def canEqual(other: Any): Boolean = other.isInstanceOf[TestSerializationSchema] + + override def equals(other: Any): Boolean = other match { + case that: TestSerializationSchema => + (that canEqual this) && + typeInfo == that.typeInfo + case _ => false + } + + override def hashCode(): Int = { + 31 * typeInfo.hashCode() + } } http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala index 475cff9..39c268e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala @@ -20,9 +20,9 @@ package org.apache.flink.table.factories.utils import java.util -import org.apache.flink.api.common.serialization.DeserializationSchema +import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema} import org.apache.flink.table.descriptors.{DescriptorProperties, FormatDescriptorValidator, SchemaValidator} -import org.apache.flink.table.factories.{DeserializationSchemaFactory, TableFormatFactoryServiceTest} +import org.apache.flink.table.factories.{DeserializationSchemaFactory, SerializationSchemaFactory, TableFormatFactoryServiceTest} import org.apache.flink.types.Row /** @@ -31,7 +31,9 @@ import org.apache.flink.types.Row * It has the same context as [[TestAmbiguousTableFormatFactory]] and both support COMMON_PATH. * This format does not support SPECIAL_PATH but supports schema derivation. */ -class TestTableFormatFactory extends DeserializationSchemaFactory[Row] { +class TestTableFormatFactory + extends DeserializationSchemaFactory[Row] + with SerializationSchemaFactory[Row] { override def requiredContext(): util.Map[String, String] = { val context = new util.HashMap[String, String]() @@ -62,4 +64,14 @@ class TestTableFormatFactory extends DeserializationSchemaFactory[Row] { val schema = SchemaValidator.deriveFormatFields(props) new TestDeserializationSchema(schema.toRowType) } + + override def createSerializationSchema( + properties: util.Map[String, String]) + : SerializationSchema[Row] = { + + val props = new DescriptorProperties(true) + props.putProperties(properties) + val schema = SchemaValidator.deriveFormatFields(props) + new TestSerializationSchema(schema.toRowType) + } }
