[FLINK-9846] [table] Add a Kafka table sink factory Adds a Kafka table sink factory with format discovery. Currently, this enables the SQL Client to write Avro and JSON data to Kafka. The functionality is limited due to FLINK-9870. Therefore, it is currently not possible to use time attributes in the output.
Changes: - Decouple Kafka sink from formats and deprecate old classes - Add a Kafka table sink factory - Existing tests for the KafkaTableSourceFactory have been generalized to support sinks as well. This closes #6387. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa25b4b3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa25b4b3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa25b4b3 Branch: refs/heads/release-1.6 Commit: aa25b4b324b4c025fe9e58e081677faf0ab50a7d Parents: 702f773 Author: Timo Walther <[email protected]> Authored: Mon Jul 23 08:12:00 2018 +0200 Committer: Timo Walther <[email protected]> Committed: Mon Jul 23 18:19:53 2018 +0200 ---------------------------------------------------------------------- .../connectors/kafka/Kafka010JsonTableSink.java | 19 +- .../connectors/kafka/Kafka010TableSink.java | 61 ++++ .../kafka/Kafka010TableSourceFactory.java | 72 ---- .../kafka/Kafka010TableSourceSinkFactory.java | 90 +++++ ...rg.apache.flink.table.factories.TableFactory | 2 +- .../kafka/Kafka010JsonTableSinkTest.java | 4 + .../kafka/Kafka010TableSourceFactoryTest.java | 74 ----- .../Kafka010TableSourceSinkFactoryTest.java | 99 ++++++ .../connectors/kafka/Kafka011TableSink.java | 64 ++++ .../connectors/kafka/Kafka011TableSource.java | 3 +- .../kafka/Kafka011TableSourceFactory.java | 72 ---- .../kafka/Kafka011TableSourceSinkFactory.java | 90 +++++ ...rg.apache.flink.table.factories.TableFactory | 2 +- .../kafka/Kafka011TableSourceFactoryTest.java | 74 ----- .../Kafka011TableSourceSinkFactoryTest.java | 99 ++++++ .../connectors/kafka/Kafka08JsonTableSink.java | 19 +- .../connectors/kafka/Kafka08TableSink.java | 61 ++++ .../connectors/kafka/Kafka08TableSource.java | 3 +- .../kafka/Kafka08TableSourceFactory.java | 72 ---- .../kafka/Kafka08TableSourceSinkFactory.java | 90 +++++ ...rg.apache.flink.table.factories.TableFactory | 2 +- .../kafka/Kafka08JsonTableSinkTest.java | 4 + .../kafka/Kafka08TableSourceFactoryTest.java | 74 ----- .../Kafka08TableSourceSinkFactoryTest.java | 99 ++++++ .../connectors/kafka/Kafka09JsonTableSink.java | 19 +- .../connectors/kafka/Kafka09TableSink.java | 61 ++++ .../connectors/kafka/Kafka09TableSource.java | 3 +- .../kafka/Kafka09TableSourceFactory.java | 72 ---- .../kafka/Kafka09TableSourceSinkFactory.java | 90 +++++ ...rg.apache.flink.table.factories.TableFactory | 2 +- .../kafka/Kafka09JsonTableSinkTest.java | 4 + .../kafka/Kafka09TableSourceFactoryTest.java | 74 ----- .../Kafka09TableSourceSinkFactoryTest.java | 99 ++++++ .../connectors/kafka/KafkaJsonTableSink.java | 5 + .../connectors/kafka/KafkaTableSink.java | 112 ++++++- .../kafka/KafkaTableSourceFactory.java | 251 -------------- .../kafka/KafkaTableSourceSinkFactoryBase.java | 330 +++++++++++++++++++ .../partitioner/FlinkFixedPartitioner.java | 10 + .../kafka/KafkaTableSinkTestBase.java | 7 +- .../kafka/KafkaTableSourceFactoryTestBase.java | 196 ----------- .../KafkaTableSourceSinkFactoryTestBase.java | 299 +++++++++++++++++ .../utils/TestSerializationSchema.scala | 16 +- .../utils/TestTableFormatFactory.scala | 18 +- 43 files changed, 1852 insertions(+), 1065 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java index ef33cd5..2ad3142 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java @@ -18,18 +18,23 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.descriptors.ConnectorDescriptor; import org.apache.flink.types.Row; import java.util.Properties; /** * Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format. + * + * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together + * with descriptors for schema and format instead. Descriptors allow for + * implementation-agnostic definition of tables. See also + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ -@PublicEvolving +@Deprecated public class Kafka010JsonTableSink extends KafkaJsonTableSink { /** @@ -46,7 +51,9 @@ public class Kafka010JsonTableSink extends KafkaJsonTableSink { * * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka + * @deprecated Use table descriptors instead of implementation-specific classes. */ + @Deprecated public Kafka010JsonTableSink(String topic, Properties properties) { super(topic, properties, new FlinkFixedPartitioner<>()); } @@ -58,14 +65,20 @@ public class Kafka010JsonTableSink extends KafkaJsonTableSink { * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka * @param partitioner Kafka partitioner + * @deprecated Use table descriptors instead of implementation-specific classes. */ + @Deprecated public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) { super(topic, properties, partitioner); } @Override protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) { - return new FlinkKafkaProducer010<>(topic, serializationSchema, properties, partitioner); + return new FlinkKafkaProducer010<>( + topic, + serializationSchema, + properties, + partitioner); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java new file mode 100644 index 0000000..a8c6553 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import java.util.Properties; + +/** + * Kafka 0.10 table sink for writing data into Kafka. + */ +@Internal +public class Kafka010TableSink extends KafkaTableSink { + + public Kafka010TableSink( + TableSchema schema, + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner, + SerializationSchema<Row> serializationSchema) { + super( + schema, + topic, + properties, + partitioner, + serializationSchema); + } + + @Override + protected FlinkKafkaProducerBase<Row> createKafkaProducer( + String topic, + Properties properties, + SerializationSchema<Row> serializationSchema, + FlinkKafkaPartitioner<Row> partitioner) { + return new FlinkKafkaProducer010<>( + topic, + serializationSchema, + properties, + partitioner); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java deleted file mode 100644 index 4a86016..0000000 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.descriptors.KafkaValidator; -import org.apache.flink.table.sources.RowtimeAttributeDescriptor; -import org.apache.flink.types.Row; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; - -/** - * Factory for creating configured instances of {@link Kafka010TableSource}. - */ -public class Kafka010TableSourceFactory extends KafkaTableSourceFactory { - - @Override - protected String kafkaVersion() { - return KafkaValidator.CONNECTOR_VERSION_VALUE_010; - } - - @Override - protected boolean supportsKafkaTimestamps() { - return true; - } - - @Override - protected KafkaTableSource createKafkaTableSource( - TableSchema schema, - Optional<String> proctimeAttribute, - List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, - Map<String, String> fieldMapping, - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - StartupMode startupMode, - Map<KafkaTopicPartition, Long> specificStartupOffsets) { - - return new Kafka010TableSource( - schema, - proctimeAttribute, - rowtimeAttributeDescriptors, - Optional.of(fieldMapping), - topic, - properties, - deserializationSchema, - startupMode, - specificStartupOffsets); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java new file mode 100644 index 0000000..0cf9499 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.KafkaValidator; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Factory for creating configured instances of {@link Kafka010TableSource}. + */ +public class Kafka010TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase { + + @Override + protected String kafkaVersion() { + return KafkaValidator.CONNECTOR_VERSION_VALUE_010; + } + + @Override + protected boolean supportsKafkaTimestamps() { + return true; + } + + @Override + protected KafkaTableSource createKafkaTableSource( + TableSchema schema, + Optional<String> proctimeAttribute, + List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, + Map<String, String> fieldMapping, + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets) { + + return new Kafka010TableSource( + schema, + proctimeAttribute, + rowtimeAttributeDescriptors, + Optional.of(fieldMapping), + topic, + properties, + deserializationSchema, + startupMode, + specificStartupOffsets); + } + + @Override + protected KafkaTableSink createKafkaTableSink( + TableSchema schema, + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner, + SerializationSchema<Row> serializationSchema) { + + return new Kafka010TableSink( + schema, + topic, + properties, + partitioner, + serializationSchema); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index 21f5707..9bb0363 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceFactory +org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java index af562c6..339420c 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java @@ -27,7 +27,11 @@ import java.util.Properties; /** * Tests for the {@link Kafka010JsonTableSink}. + * + * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we + * drop support for format-specific table sinks. */ +@Deprecated public class Kafka010JsonTableSinkTest extends KafkaTableSinkTestBase { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java deleted file mode 100644 index ff3b0b0..0000000 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.descriptors.KafkaValidator; -import org.apache.flink.table.sources.RowtimeAttributeDescriptor; -import org.apache.flink.types.Row; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; - -/** - * Test for {@link Kafka010TableSource} created by {@link Kafka010TableSourceFactory}. - */ -public class Kafka010TableSourceFactoryTest extends KafkaTableSourceFactoryTestBase { - - @Override - protected String getKafkaVersion() { - return KafkaValidator.CONNECTOR_VERSION_VALUE_010; - } - - @Override - @SuppressWarnings("unchecked") - protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() { - return (Class) FlinkKafkaConsumer010.class; - } - - @Override - protected KafkaTableSource getExpectedKafkaTableSource( - TableSchema schema, - Optional<String> proctimeAttribute, - List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, - Map<String, String> fieldMapping, - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - StartupMode startupMode, - Map<KafkaTopicPartition, Long> specificStartupOffsets) { - - return new Kafka010TableSource( - schema, - proctimeAttribute, - rowtimeAttributeDescriptors, - Optional.of(fieldMapping), - topic, - properties, - deserializationSchema, - startupMode, - specificStartupOffsets - ); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java new file mode 100644 index 0000000..cc198c9 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.KafkaValidator; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Test for {@link Kafka010TableSource} and {@link Kafka010TableSink} created + * by {@link Kafka010TableSourceSinkFactory}. + */ +public class Kafka010TableSourceSinkFactoryTest extends KafkaTableSourceSinkFactoryTestBase { + + @Override + protected String getKafkaVersion() { + return KafkaValidator.CONNECTOR_VERSION_VALUE_010; + } + + @Override + @SuppressWarnings("unchecked") + protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() { + return (Class) FlinkKafkaConsumer010.class; + } + + @Override + protected Class<?> getExpectedFlinkKafkaProducer() { + return FlinkKafkaProducer010.class; + } + + @Override + protected KafkaTableSource getExpectedKafkaTableSource( + TableSchema schema, + Optional<String> proctimeAttribute, + List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, + Map<String, String> fieldMapping, + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets) { + + return new Kafka010TableSource( + schema, + proctimeAttribute, + rowtimeAttributeDescriptors, + Optional.of(fieldMapping), + topic, + properties, + deserializationSchema, + startupMode, + specificStartupOffsets + ); + } + + @Override + protected KafkaTableSink getExpectedKafkaTableSink( + TableSchema schema, + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner, + SerializationSchema<Row> serializationSchema) { + + return new Kafka010TableSink( + schema, + topic, + properties, + partitioner, + serializationSchema + ); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java new file mode 100644 index 0000000..22c6da1 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import java.util.Optional; +import java.util.Properties; + +/** + * Kafka 0.11 table sink for writing data into Kafka. + */ +@Internal +public class Kafka011TableSink extends KafkaTableSink { + + public Kafka011TableSink( + TableSchema schema, + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner, + SerializationSchema<Row> serializationSchema) { + super( + schema, + topic, + properties, + partitioner, + serializationSchema); + } + + @Override + protected SinkFunction<Row> createKafkaProducer( + String topic, + Properties properties, + SerializationSchema<Row> serializationSchema, + FlinkKafkaPartitioner<Row> partitioner) { + return new FlinkKafkaProducer011<>( + topic, + new KeyedSerializationSchemaWrapper<>(serializationSchema), + properties, + Optional.of(partitioner)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java index 85f5669..a646317 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java @@ -58,7 +58,8 @@ public class Kafka011TableSource extends KafkaTableSource { Optional<String> proctimeAttribute, List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, Optional<Map<String, String>> fieldMapping, - String topic, Properties properties, + String topic, + Properties properties, DeserializationSchema<Row> deserializationSchema, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets) { http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java deleted file mode 100644 index b1e3929..0000000 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.descriptors.KafkaValidator; -import org.apache.flink.table.sources.RowtimeAttributeDescriptor; -import org.apache.flink.types.Row; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; - -/** - * Factory for creating configured instances of {@link Kafka011TableSource}. - */ -public class Kafka011TableSourceFactory extends KafkaTableSourceFactory { - - @Override - protected String kafkaVersion() { - return KafkaValidator.CONNECTOR_VERSION_VALUE_011; - } - - @Override - protected boolean supportsKafkaTimestamps() { - return true; - } - - @Override - protected KafkaTableSource createKafkaTableSource( - TableSchema schema, - Optional<String> proctimeAttribute, - List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, - Map<String, String> fieldMapping, - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - StartupMode startupMode, - Map<KafkaTopicPartition, Long> specificStartupOffsets) { - - return new Kafka011TableSource( - schema, - proctimeAttribute, - rowtimeAttributeDescriptors, - Optional.of(fieldMapping), - topic, - properties, - deserializationSchema, - startupMode, - specificStartupOffsets); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java new file mode 100644 index 0000000..c26df42 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.KafkaValidator; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Factory for creating configured instances of {@link Kafka011TableSource}. + */ +public class Kafka011TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase { + + @Override + protected String kafkaVersion() { + return KafkaValidator.CONNECTOR_VERSION_VALUE_011; + } + + @Override + protected boolean supportsKafkaTimestamps() { + return true; + } + + @Override + protected KafkaTableSource createKafkaTableSource( + TableSchema schema, + Optional<String> proctimeAttribute, + List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, + Map<String, String> fieldMapping, + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets) { + + return new Kafka011TableSource( + schema, + proctimeAttribute, + rowtimeAttributeDescriptors, + Optional.of(fieldMapping), + topic, + properties, + deserializationSchema, + startupMode, + specificStartupOffsets); + } + + @Override + protected KafkaTableSink createKafkaTableSink( + TableSchema schema, + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner, + SerializationSchema<Row> serializationSchema) { + + return new Kafka011TableSink( + schema, + topic, + properties, + partitioner, + serializationSchema); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index c056097..b59b4a7 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceFactory +org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java deleted file mode 100644 index abaa490..0000000 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.descriptors.KafkaValidator; -import org.apache.flink.table.sources.RowtimeAttributeDescriptor; -import org.apache.flink.types.Row; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; - -/** - * Test for {@link Kafka011TableSource} created by {@link Kafka011TableSourceFactory}. - */ -public class Kafka011TableSourceFactoryTest extends KafkaTableSourceFactoryTestBase { - - @Override - protected String getKafkaVersion() { - return KafkaValidator.CONNECTOR_VERSION_VALUE_011; - } - - @Override - @SuppressWarnings("unchecked") - protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() { - return (Class) FlinkKafkaConsumer011.class; - } - - @Override - protected KafkaTableSource getExpectedKafkaTableSource( - TableSchema schema, - Optional<String> proctimeAttribute, - List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, - Map<String, String> fieldMapping, - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - StartupMode startupMode, - Map<KafkaTopicPartition, Long> specificStartupOffsets) { - - return new Kafka011TableSource( - schema, - proctimeAttribute, - rowtimeAttributeDescriptors, - Optional.of(fieldMapping), - topic, - properties, - deserializationSchema, - startupMode, - specificStartupOffsets - ); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java new file mode 100644 index 0000000..996c508 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.KafkaValidator; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Test for {@link Kafka011TableSource} and {@link Kafka011TableSink} created + * by {@link Kafka011TableSourceSinkFactory}. + */ +public class Kafka011TableSourceSinkFactoryTest extends KafkaTableSourceSinkFactoryTestBase { + + @Override + protected String getKafkaVersion() { + return KafkaValidator.CONNECTOR_VERSION_VALUE_011; + } + + @Override + @SuppressWarnings("unchecked") + protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() { + return (Class) FlinkKafkaConsumer011.class; + } + + @Override + protected Class<?> getExpectedFlinkKafkaProducer() { + return FlinkKafkaProducer011.class; + } + + @Override + protected KafkaTableSource getExpectedKafkaTableSource( + TableSchema schema, + Optional<String> proctimeAttribute, + List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, + Map<String, String> fieldMapping, + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets) { + + return new Kafka011TableSource( + schema, + proctimeAttribute, + rowtimeAttributeDescriptors, + Optional.of(fieldMapping), + topic, + properties, + deserializationSchema, + startupMode, + specificStartupOffsets + ); + } + + @Override + protected KafkaTableSink getExpectedKafkaTableSink( + TableSchema schema, + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner, + SerializationSchema<Row> serializationSchema) { + + return new Kafka011TableSink( + schema, + topic, + properties, + partitioner, + serializationSchema + ); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java index c60288d..45588cd 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -18,20 +18,25 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.table.descriptors.ConnectorDescriptor; import org.apache.flink.types.Row; import java.util.Properties; /** * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format. + * + * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together + * with descriptors for schema and format instead. Descriptors allow for + * implementation-agnostic definition of tables. See also + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ -@PublicEvolving +@Deprecated public class Kafka08JsonTableSink extends KafkaJsonTableSink { /** @@ -48,7 +53,9 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink { * * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka + * @deprecated Use table descriptors instead of implementation-specific classes. */ + @Deprecated public Kafka08JsonTableSink(String topic, Properties properties) { super(topic, properties, new FlinkFixedPartitioner<>()); } @@ -60,7 +67,9 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink { * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka * @param partitioner Kafka partitioner + * @deprecated Use table descriptors instead of implementation-specific classes. */ + @Deprecated public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) { super(topic, properties, partitioner); } @@ -84,7 +93,11 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink { @Override protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) { - return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner); + return new FlinkKafkaProducer08<>( + topic, + serializationSchema, + properties, + partitioner); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java new file mode 100644 index 0000000..c34de13 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import java.util.Properties; + +/** + * Kafka 0.8 table sink for writing data into Kafka. + */ +@Internal +public class Kafka08TableSink extends KafkaTableSink { + + public Kafka08TableSink( + TableSchema schema, + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner, + SerializationSchema<Row> serializationSchema) { + super( + schema, + topic, + properties, + partitioner, + serializationSchema); + } + + @Override + protected FlinkKafkaProducerBase<Row> createKafkaProducer( + String topic, + Properties properties, + SerializationSchema<Row> serializationSchema, + FlinkKafkaPartitioner<Row> partitioner) { + return new FlinkKafkaProducer08<>( + topic, + serializationSchema, + properties, + partitioner); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java index 1a025b8..97c293e 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java @@ -58,7 +58,8 @@ public class Kafka08TableSource extends KafkaTableSource { Optional<String> proctimeAttribute, List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, Optional<Map<String, String>> fieldMapping, - String topic, Properties properties, + String topic, + Properties properties, DeserializationSchema<Row> deserializationSchema, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets) { http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactory.java deleted file mode 100644 index cd33751..0000000 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactory.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.descriptors.KafkaValidator; -import org.apache.flink.table.sources.RowtimeAttributeDescriptor; -import org.apache.flink.types.Row; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; - -/** - * Factory for creating configured instances of {@link Kafka08TableSource}. - */ -public class Kafka08TableSourceFactory extends KafkaTableSourceFactory { - - @Override - protected String kafkaVersion() { - return KafkaValidator.CONNECTOR_VERSION_VALUE_08; - } - - @Override - protected boolean supportsKafkaTimestamps() { - return false; - } - - @Override - protected KafkaTableSource createKafkaTableSource( - TableSchema schema, - Optional<String> proctimeAttribute, - List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, - Map<String, String> fieldMapping, - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - StartupMode startupMode, - Map<KafkaTopicPartition, Long> specificStartupOffsets) { - - return new Kafka08TableSource( - schema, - proctimeAttribute, - rowtimeAttributeDescriptors, - Optional.of(fieldMapping), - topic, - properties, - deserializationSchema, - startupMode, - specificStartupOffsets); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java new file mode 100644 index 0000000..3e93b6f --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.KafkaValidator; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Factory for creating configured instances of {@link Kafka08TableSource}. + */ +public class Kafka08TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase { + + @Override + protected String kafkaVersion() { + return KafkaValidator.CONNECTOR_VERSION_VALUE_08; + } + + @Override + protected boolean supportsKafkaTimestamps() { + return false; + } + + @Override + protected KafkaTableSource createKafkaTableSource( + TableSchema schema, + Optional<String> proctimeAttribute, + List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, + Map<String, String> fieldMapping, + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets) { + + return new Kafka08TableSource( + schema, + proctimeAttribute, + rowtimeAttributeDescriptors, + Optional.of(fieldMapping), + topic, + properties, + deserializationSchema, + startupMode, + specificStartupOffsets); + } + + @Override + protected KafkaTableSink createKafkaTableSink( + TableSchema schema, + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner, + SerializationSchema<Row> serializationSchema) { + + return new Kafka08TableSink( + schema, + topic, + properties, + partitioner, + serializationSchema); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index b83bb3f..f2e1c3f 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceFactory +org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java index 53da9f6..32bd3b6 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -27,7 +27,11 @@ import java.util.Properties; /** * Tests for the {@link Kafka08JsonTableSink}. + * + * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we + * drop support for format-specific table sinks. */ +@Deprecated public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java deleted file mode 100644 index d939d88..0000000 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.descriptors.KafkaValidator; -import org.apache.flink.table.sources.RowtimeAttributeDescriptor; -import org.apache.flink.types.Row; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; - -/** - * Test for {@link Kafka08TableSource} created by {@link Kafka08TableSourceFactory}. - */ -public class Kafka08TableSourceFactoryTest extends KafkaTableSourceFactoryTestBase { - - @Override - protected String getKafkaVersion() { - return KafkaValidator.CONNECTOR_VERSION_VALUE_08; - } - - @Override - @SuppressWarnings("unchecked") - protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() { - return (Class) FlinkKafkaConsumer08.class; - } - - @Override - protected KafkaTableSource getExpectedKafkaTableSource( - TableSchema schema, - Optional<String> proctimeAttribute, - List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, - Map<String, String> fieldMapping, - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - StartupMode startupMode, - Map<KafkaTopicPartition, Long> specificStartupOffsets) { - - return new Kafka08TableSource( - schema, - proctimeAttribute, - rowtimeAttributeDescriptors, - Optional.of(fieldMapping), - topic, - properties, - deserializationSchema, - startupMode, - specificStartupOffsets - ); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java new file mode 100644 index 0000000..b67501e --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.KafkaValidator; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Test for {@link Kafka08TableSource} and {@link Kafka08TableSink} created + * by {@link Kafka08TableSourceSinkFactory}. + */ +public class Kafka08TableSourceSinkFactoryTest extends KafkaTableSourceSinkFactoryTestBase { + + @Override + protected String getKafkaVersion() { + return KafkaValidator.CONNECTOR_VERSION_VALUE_08; + } + + @Override + @SuppressWarnings("unchecked") + protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() { + return (Class) FlinkKafkaConsumer08.class; + } + + @Override + protected Class<?> getExpectedFlinkKafkaProducer() { + return FlinkKafkaProducer08.class; + } + + @Override + protected KafkaTableSource getExpectedKafkaTableSource( + TableSchema schema, + Optional<String> proctimeAttribute, + List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, + Map<String, String> fieldMapping, + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets) { + + return new Kafka08TableSource( + schema, + proctimeAttribute, + rowtimeAttributeDescriptors, + Optional.of(fieldMapping), + topic, + properties, + deserializationSchema, + startupMode, + specificStartupOffsets + ); + } + + @Override + protected KafkaTableSink getExpectedKafkaTableSink( + TableSchema schema, + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner, + SerializationSchema<Row> serializationSchema) { + + return new Kafka08TableSink( + schema, + topic, + properties, + partitioner, + serializationSchema + ); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java index 95ce4e6..b3cc0aa 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java @@ -18,20 +18,25 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.table.descriptors.ConnectorDescriptor; import org.apache.flink.types.Row; import java.util.Properties; /** * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format. + * + * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together + * with descriptors for schema and format instead. Descriptors allow for + * implementation-agnostic definition of tables. See also + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ -@PublicEvolving +@Deprecated public class Kafka09JsonTableSink extends KafkaJsonTableSink { /** @@ -48,7 +53,9 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink { * * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka + * @deprecated Use table descriptors instead of implementation-specific classes. */ + @Deprecated public Kafka09JsonTableSink(String topic, Properties properties) { super(topic, properties, new FlinkFixedPartitioner<>()); } @@ -60,7 +67,9 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink { * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka * @param partitioner Kafka partitioner + * @deprecated Use table descriptors instead of implementation-specific classes. */ + @Deprecated public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) { super(topic, properties, partitioner); } @@ -84,7 +93,11 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink { @Override protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) { - return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner); + return new FlinkKafkaProducer09<>( + topic, + serializationSchema, + properties, + partitioner); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java new file mode 100644 index 0000000..8c349d7 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import java.util.Properties; + +/** + * Kafka 0.9 table sink for writing data into Kafka. + */ +@Internal +public class Kafka09TableSink extends KafkaTableSink { + + public Kafka09TableSink( + TableSchema schema, + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner, + SerializationSchema<Row> serializationSchema) { + super( + schema, + topic, + properties, + partitioner, + serializationSchema); + } + + @Override + protected FlinkKafkaProducerBase<Row> createKafkaProducer( + String topic, + Properties properties, + SerializationSchema<Row> serializationSchema, + FlinkKafkaPartitioner<Row> partitioner) { + return new FlinkKafkaProducer09<>( + topic, + serializationSchema, + properties, + partitioner); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java index 18bc1c4..8f9e799 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java @@ -58,7 +58,8 @@ public class Kafka09TableSource extends KafkaTableSource { Optional<String> proctimeAttribute, List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, Optional<Map<String, String>> fieldMapping, - String topic, Properties properties, + String topic, + Properties properties, DeserializationSchema<Row> deserializationSchema, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets) { http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactory.java deleted file mode 100644 index 14c52fd..0000000 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactory.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.descriptors.KafkaValidator; -import org.apache.flink.table.sources.RowtimeAttributeDescriptor; -import org.apache.flink.types.Row; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; - -/** - * Factory for creating configured instances of {@link Kafka09TableSource}. - */ -public class Kafka09TableSourceFactory extends KafkaTableSourceFactory { - - @Override - protected String kafkaVersion() { - return KafkaValidator.CONNECTOR_VERSION_VALUE_09; - } - - @Override - protected boolean supportsKafkaTimestamps() { - return false; - } - - @Override - protected KafkaTableSource createKafkaTableSource( - TableSchema schema, - Optional<String> proctimeAttribute, - List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, - Map<String, String> fieldMapping, - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - StartupMode startupMode, - Map<KafkaTopicPartition, Long> specificStartupOffsets) { - - return new Kafka09TableSource( - schema, - proctimeAttribute, - rowtimeAttributeDescriptors, - Optional.of(fieldMapping), - topic, - properties, - deserializationSchema, - startupMode, - specificStartupOffsets); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java new file mode 100644 index 0000000..9958b4e --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.KafkaValidator; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Factory for creating configured instances of {@link Kafka09TableSource}. + */ +public class Kafka09TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase { + + @Override + protected String kafkaVersion() { + return KafkaValidator.CONNECTOR_VERSION_VALUE_09; + } + + @Override + protected boolean supportsKafkaTimestamps() { + return false; + } + + @Override + protected KafkaTableSource createKafkaTableSource( + TableSchema schema, + Optional<String> proctimeAttribute, + List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, + Map<String, String> fieldMapping, + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets) { + + return new Kafka09TableSource( + schema, + proctimeAttribute, + rowtimeAttributeDescriptors, + Optional.of(fieldMapping), + topic, + properties, + deserializationSchema, + startupMode, + specificStartupOffsets); + } + + @Override + protected KafkaTableSink createKafkaTableSink( + TableSchema schema, + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner, + SerializationSchema<Row> serializationSchema) { + + return new Kafka09TableSink( + schema, + topic, + properties, + partitioner, + serializationSchema); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index fb14ddb..2625873 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceFactory +org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java index 610e048..79f251b 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -27,7 +27,11 @@ import java.util.Properties; /** * Tests for the {@link Kafka09JsonTableSink}. + * + * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we + * drop support for format-specific table sinks. */ +@Deprecated public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase { @Override
