Repository: flink Updated Branches: refs/heads/master 481091043 -> abb449678
[FLINK-3874] [tableApi] Add KafkaJsonTableSink This closes #2244 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/abb44967 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/abb44967 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/abb44967 Branch: refs/heads/master Commit: abb4496781883937a935113c1e33ae1174aafa73 Parents: 4810910 Author: Ivan Mushketyk <[email protected]> Authored: Tue Jul 5 22:00:18 2016 +0100 Committer: Fabian Hueske <[email protected]> Committed: Thu Aug 25 22:14:32 2016 +0200 ---------------------------------------------------------------------- .../flink/api/table/sinks/CsvTableSink.scala | 4 +- .../flink/api/table/sinks/TableSink.scala | 40 +----- .../flink/api/table/sinks/TableSinkBase.scala | 67 +++++++++ .../connectors/kafka/Kafka08JsonTableSink.java | 51 +++++++ .../kafka/Kafka08JsonTableSinkITCase.java | 40 ++++++ .../connectors/kafka/Kafka09JsonTableSink.java | 50 +++++++ .../kafka/Kafka09JsonTableSinkITCase.java | 39 +++++ .../connectors/kafka/KafkaJsonTableSink.java | 47 ++++++ .../connectors/kafka/KafkaTableSink.java | 126 ++++++++++++++++ .../connectors/kafka/KafkaTableSource.java | 15 +- .../connectors/kafka/internals/TypeUtil.java | 38 +++++ .../JsonRowDeserializationSchema.java | 4 +- .../JsonRowSerializationSchema.java | 70 +++++++++ .../kafka/JsonRowSerializationSchemaTest.java | 98 +++++++++++++ .../kafka/KafkaTableSinkTestBase.java | 142 +++++++++++++++++++ 15 files changed, 780 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala index ed05caf..7567ba8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala @@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream class CsvTableSink( path: String, fieldDelim: String = ",") - extends BatchTableSink[Row] with StreamTableSink[Row] { + extends TableSinkBase[Row] with BatchTableSink[Row] with StreamTableSink[Row] { override def emitDataSet(dataSet: DataSet[Row]): Unit = { dataSet @@ -48,7 +48,7 @@ class CsvTableSink( .writeAsText(path) } - override protected def copy: TableSink[Row] = { + override protected def copy: TableSinkBase[Row] = { new CsvTableSink(path, fieldDelim) } http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala index 12e57de..3dfc6f1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala @@ -29,9 +29,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation */ trait TableSink[T] { - private var fieldNames: Option[Array[String]] = None - private var fieldTypes: Option[Array[TypeInformation[_]]] = None - /** * Return the type expected by this [[TableSink]]. * @@ -41,27 +38,11 @@ trait TableSink[T] { */ def getOutputType: TypeInformation[T] - /** Return a deep copy of the [[TableSink]]. */ - protected def copy: TableSink[T] - - /** - * Return the field names of the [[org.apache.flink.api.table.Table]] to emit. */ - protected final def getFieldNames: Array[String] = { - fieldNames match { - case Some(n) => n - case None => throw new IllegalStateException( - "TableSink must be configured to retrieve field names.") - } - } + /** Returns the names of the table fields. */ + def getFieldNames: Array[String] - /** Return the field types of the [[org.apache.flink.api.table.Table]] to emit. */ - protected final def getFieldTypes: Array[TypeInformation[_]] = { - fieldTypes match { - case Some(t) => t - case None => throw new IllegalStateException( - "TableSink must be configured to retrieve field types.") - } - } + /** Returns the types of the table fields. */ + def getFieldTypes: Array[TypeInformation[_]] /** * Return a copy of this [[TableSink]] configured with the field names and types of the @@ -72,15 +53,6 @@ trait TableSink[T] { * @return A copy of this [[TableSink]] configured with the field names and types of the * [[org.apache.flink.api.table.Table]] to emit. */ - private[flink] final def configure( - fieldNames: Array[String], - fieldTypes: Array[TypeInformation[_]]): TableSink[T] = { - - val configuredSink = this.copy - configuredSink.fieldNames = Some(fieldNames) - configuredSink.fieldTypes = Some(fieldTypes) - - configuredSink - } - + def configure(fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]]): TableSink[T] } http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala new file mode 100644 index 0000000..612ee0a --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.sinks + +import org.apache.flink.api.common.typeinfo.TypeInformation + +trait TableSinkBase[T] extends TableSink[T] { + + private var fieldNames: Option[Array[String]] = None + private var fieldTypes: Option[Array[TypeInformation[_]]] = None + + /** Return a deep copy of the [[TableSink]]. */ + protected def copy: TableSinkBase[T] + + /** + * Return the field names of the [[org.apache.flink.api.table.Table]] to emit. */ + def getFieldNames: Array[String] = { + fieldNames match { + case Some(n) => n + case None => throw new IllegalStateException( + "TableSink must be configured to retrieve field names.") + } + } + + /** Return the field types of the [[org.apache.flink.api.table.Table]] to emit. */ + def getFieldTypes: Array[TypeInformation[_]] = { + fieldTypes match { + case Some(t) => t + case None => throw new IllegalStateException( + "TableSink must be configured to retrieve field types.") + } + } + + /** + * Return a copy of this [[TableSink]] configured with the field names and types of the + * [[org.apache.flink.api.table.Table]] to emit. + * + * @param fieldNames The field names of the table to emit. + * @param fieldTypes The field types of the table to emit. + * @return A copy of this [[TableSink]] configured with the field names and types of the + * [[org.apache.flink.api.table.Table]] to emit. + */ + final def configure(fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]]): TableSink[T] = { + + val configuredSink = this.copy + configuredSink.fieldNames = Some(fieldNames) + configuredSink.fieldTypes = Some(fieldTypes) + + configuredSink + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java new file mode 100644 index 0000000..5f869ec --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import java.util.Properties; + +/** + * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format. + */ +public class Kafka08JsonTableSink extends KafkaJsonTableSink { + /** + * Creates {@link KafkaTableSink} for Kafka 0.8 + * + * @param topic topic in Kafka to which table is written + * @param properties properties to connect to Kafka + * @param partitioner Kafka partitioner + */ + public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) { + super(topic, properties, partitioner); + } + + @Override + protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) { + return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner); + } + + @Override + protected Kafka08JsonTableSink createCopy() { + return new Kafka08JsonTableSink(topic, properties, partitioner); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java new file mode 100644 index 0000000..f870adf --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; + +public class Kafka08JsonTableSinkITCase extends KafkaTableSinkTestBase { + + @Override + protected KafkaTableSink createTableSink() { + Kafka08JsonTableSink sink = new Kafka08JsonTableSink( + TOPIC, + createSinkProperties(), + createPartitioner()); + return sink.configure(FIELD_NAMES, FIELD_TYPES); + } + + protected DeserializationSchema<Row> createRowDeserializationSchema() { + return new JsonRowDeserializationSchema( + FIELD_NAMES, FIELD_TYPES); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java new file mode 100644 index 0000000..38ea47c --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import java.util.Properties; + +/** + * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format. + */ +public class Kafka09JsonTableSink extends KafkaJsonTableSink { + /** + * Creates {@link KafkaTableSink} for Kafka 0.9 + * + * @param topic topic in Kafka to which table is written + * @param properties properties to connect to Kafka + * @param partitioner Kafka partitioner + */ + public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) { + super(topic, properties, partitioner); + } + + @Override + protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) { + return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner); + } + + @Override + protected Kafka09JsonTableSink createCopy() { + return new Kafka09JsonTableSink(topic, properties, partitioner); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java new file mode 100644 index 0000000..74415f8 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; + +public class Kafka09JsonTableSinkITCase extends KafkaTableSinkTestBase { + + @Override + protected KafkaTableSink createTableSink() { + Kafka09JsonTableSink sink = new Kafka09JsonTableSink( + TOPIC, + createSinkProperties(), + createPartitioner()); + return sink.configure(FIELD_NAMES, FIELD_TYPES); + } + + protected DeserializationSchema<Row> createRowDeserializationSchema() { + return new JsonRowDeserializationSchema( + FIELD_NAMES, FIELD_TYPES); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java new file mode 100644 index 0000000..ee98783 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import java.util.Properties; + +/** + * Base class for {@link KafkaTableSink} that serializes data in JSON format + */ +public abstract class KafkaJsonTableSink extends KafkaTableSink { + + /** + * Creates KafkaJsonTableSink + * + * @param topic topic in Kafka to which table is written + * @param properties properties to connect to Kafka + * @param partitioner Kafka partitioner + */ + public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) { + super(topic, properties, partitioner); + } + + @Override + protected SerializationSchema<Row> createSerializationSchema(String[] fieldNames) { + return new JsonRowSerializationSchema(fieldNames); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java new file mode 100644 index 0000000..8f5e811 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.sinks.StreamTableSink; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +/** + * A version-agnostic Kafka {@link StreamTableSink}. + * + * <p>The version-specific Kafka consumers need to extend this class and + * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}. + */ +public abstract class KafkaTableSink implements StreamTableSink<Row> { + + protected final String topic; + protected final Properties properties; + protected SerializationSchema<Row> serializationSchema; + protected final KafkaPartitioner<Row> partitioner; + protected String[] fieldNames; + protected TypeInformation[] fieldTypes; + /** + * Creates KafkaTableSink + * + * @param topic Kafka topic to write to. + * @param properties Properties for the Kafka consumer. + * @param partitioner Partitioner to select Kafka partition for each item + */ + public KafkaTableSink( + String topic, + Properties properties, + KafkaPartitioner<Row> partitioner) { + + this.topic = Preconditions.checkNotNull(topic, "topic"); + this.properties = Preconditions.checkNotNull(properties, "properties"); + this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner"); + } + + /** + * Returns the version-specifid Kafka producer. + * + * @param topic Kafka topic to produce to. + * @param properties Properties for the Kafka producer. + * @param serializationSchema Serialization schema to use to create Kafka records. + * @param partitioner Partitioner to select Kafka partition. + * @return The version-specific Kafka producer + */ + protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer( + String topic, Properties properties, + SerializationSchema<Row> serializationSchema, + KafkaPartitioner<Row> partitioner); + + /** + * Create serialization schema for converting table rows into bytes. + * + * @param fieldNames Field names in table rows. + * @return Instance of serialization schema + */ + protected abstract SerializationSchema<Row> createSerializationSchema(String[] fieldNames); + + /** + * Create a deep copy of this sink. + * + * @return Deep copy of this sink + */ + protected abstract KafkaTableSink createCopy(); + + @Override + public void emitDataStream(DataStream<Row> dataStream) { + FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner); + dataStream.addSink(kafkaProducer); + } + + @Override + public TypeInformation<Row> getOutputType() { + return new RowTypeInfo(getFieldTypes()); + } + + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fieldTypes; + } + + @Override + public KafkaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + KafkaTableSink copy = createCopy(); + copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); + copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); + Preconditions.checkArgument(fieldNames.length == fieldTypes.length, + "Number of provided field names and types does not match."); + copy.serializationSchema = createSerializationSchema(fieldNames); + + return copy; + } + + + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index c6904fe..fc6bf44 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.table.Row; import org.apache.flink.api.table.sources.StreamTableSource; import org.apache.flink.api.table.typeutils.RowTypeInfo; @@ -30,6 +29,8 @@ import org.apache.flink.util.Preconditions; import java.util.Properties; +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo; + /** * A version-agnostic Kafka {@link StreamTableSource}. * @@ -147,16 +148,4 @@ abstract class KafkaTableSource implements StreamTableSource<Row> { protected DeserializationSchema<Row> getDeserializationSchema() { return deserializationSchema; } - - /** - * Creates TypeInformation array for an array of Classes. - */ - private static TypeInformation<?>[] toTypeInfo(Class<?>[] fieldTypes) { - TypeInformation<?>[] typeInfos = new TypeInformation[fieldTypes.length]; - for (int i = 0; i < fieldTypes.length; i++) { - typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]); - } - return typeInfos; - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java new file mode 100644 index 0000000..7a41ade --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +public class TypeUtil { + private TypeUtil() {} + + /** + * Creates TypeInformation array for an array of Classes. + * @param fieldTypes classes to extract type information from + * @return type information + */ + public static TypeInformation<?>[] toTypeInfo(Class<?>[] fieldTypes) { + TypeInformation<?>[] typeInfos = new TypeInformation[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]); + } + return typeInfos; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java index 970c73e..4344810 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java @@ -51,7 +51,7 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row> private boolean failOnMissingField; /** - * Creates a JSON deserializtion schema for the given fields and type classes. + * Creates a JSON deserialization schema for the given fields and type classes. * * @param fieldNames Names of JSON fields to parse. * @param fieldTypes Type classes to parse JSON fields as. @@ -69,7 +69,7 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row> } /** - * Creates a JSON deserializtion schema for the given fields and types. + * Creates a JSON deserialization schema for the given fields and types. * * @param fieldNames Names of JSON fields to parse. * @param fieldTypes Types to parse JSON fields as. http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java new file mode 100644 index 0000000..077ff13 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.api.table.Row; +import org.apache.flink.util.Preconditions; + + +/** + * Serialization schema that serializes an object into a JSON bytes. + * + * <p>Serializes the input {@link Row} object into a JSON string and + * converts it into <code>byte[]</code>. + * + * <p>Result <code>byte[]</code> messages can be deserialized using + * {@link JsonRowDeserializationSchema}. + */ +public class JsonRowSerializationSchema implements SerializationSchema<Row> { + /** Fields names in the input Row object */ + private final String[] fieldNames; + /** Object mapper that is used to create output JSON objects */ + private static ObjectMapper mapper = new ObjectMapper(); + + /** + * Creates a JSON serialization schema for the given fields and types. + * + * @param fieldNames Names of JSON fields to parse. + */ + public JsonRowSerializationSchema(String[] fieldNames) { + this.fieldNames = Preconditions.checkNotNull(fieldNames); + } + + @Override + public byte[] serialize(Row row) { + if (row.productArity() != fieldNames.length) { + throw new IllegalStateException(String.format( + "Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length)); + } + + ObjectNode objectNode = mapper.createObjectNode(); + + for (int i = 0; i < row.productArity(); i++) { + JsonNode node = mapper.valueToTree(row.productElement(i)); + objectNode.set(fieldNames[i], node); + } + + try { + return mapper.writeValueAsBytes(objectNode); + } catch (Exception e) { + throw new RuntimeException("Failed to serialize row", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java new file mode 100644 index 0000000..92af15d --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class JsonRowSerializationSchemaTest { + @Test + public void testRowSerialization() throws IOException { + String[] fieldNames = new String[] {"f1", "f2", "f3"}; + Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, String.class}; + Row row = new Row(3); + row.setField(0, 1); + row.setField(1, true); + row.setField(2, "str"); + + Row resultRow = serializeAndDeserialize(fieldNames, fieldTypes, row); + assertEqualRows(row, resultRow); + } + + @Test + public void testSerializationOfTwoRows() throws IOException { + String[] fieldNames = new String[] {"f1", "f2", "f3"}; + Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, String.class}; + Row row1 = new Row(3); + row1.setField(0, 1); + row1.setField(1, true); + row1.setField(2, "str"); + + JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(fieldNames, fieldTypes); + + byte[] bytes = serializationSchema.serialize(row1); + assertEqualRows(row1, deserializationSchema.deserialize(bytes)); + + Row row2 = new Row(3); + row2.setField(0, 10); + row2.setField(1, false); + row2.setField(2, "newStr"); + + bytes = serializationSchema.serialize(row2); + assertEqualRows(row2, deserializationSchema.deserialize(bytes)); + } + + @Test(expected = NullPointerException.class) + public void testInputValidation() { + new JsonRowSerializationSchema(null); + } + + @Test(expected = IllegalStateException.class) + public void testSerializeRowWithInvalidNumberOfFields() { + String[] fieldNames = new String[] {"f1", "f2", "f3"}; + Row row = new Row(1); + row.setField(0, 1); + + JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); + serializationSchema.serialize(row); + } + + private Row serializeAndDeserialize(String[] fieldNames, Class[] fieldTypes, Row row) throws IOException { + JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(fieldNames, fieldTypes); + + byte[] bytes = serializationSchema.serialize(row); + return deserializationSchema.deserialize(bytes); + } + + private void assertEqualRows(Row expectedRow, Row resultRow) { + assertEquals("Deserialized row should have expected number of fields", + expectedRow.productArity(), resultRow.productArity()); + for (int i = 0; i < expectedRow.productArity(); i++) { + assertEquals(String.format("Field number %d should be as in the original row", i), + expectedRow.productElement(i), resultRow.productElement(i)); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java new file mode 100644 index 0000000..5e55b0a --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.test.util.SuccessException; +import org.junit.Test; + +import java.io.Serializable; +import java.util.HashSet; +import java.util.Properties; + +import static org.apache.flink.test.util.TestUtils.tryExecute; + +public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable { + + protected final static String TOPIC = "customPartitioningTestTopic"; + protected final static int PARALLELISM = 1; + protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"}; + protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); + + @Test + public void testKafkaTableSink() throws Exception { + LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()"); + + createTestTopic(TOPIC, PARALLELISM, 1); + StreamExecutionEnvironment env = createEnvironment(); + + createProducingTopology(env); + createConsumingTopology(env); + + tryExecute(env, "custom partitioning test"); + deleteTestTopic(TOPIC); + LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()"); + } + + private StreamExecutionEnvironment createEnvironment() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().disableSysoutLogging(); + return env; + } + + private void createProducingTopology(StreamExecutionEnvironment env) { + DataStream<Row> stream = env.addSource(new SourceFunction<Row>() { + private boolean running = true; + + @Override + public void run(SourceContext<Row> ctx) throws Exception { + long cnt = 0; + while (running) { + Row row = new Row(2); + row.setField(0, cnt); + row.setField(1, "kafka-" + cnt); + ctx.collect(row); + cnt++; + } + } + + @Override + public void cancel() { + running = false; + } + }) + .setParallelism(1); + + KafkaTableSink kafkaTableSinkBase = createTableSink(); + + kafkaTableSinkBase.emitDataStream(stream); + } + + private void createConsumingTopology(StreamExecutionEnvironment env) { + DeserializationSchema<Row> deserializationSchema = createRowDeserializationSchema(); + + FlinkKafkaConsumerBase<Row> source = kafkaServer.getConsumer(TOPIC, deserializationSchema, standardProps); + + env.addSource(source).setParallelism(PARALLELISM) + .map(new RichMapFunction<Row, Integer>() { + @Override + public Integer map(Row value) { + return (Integer) value.productElement(0); + } + }).setParallelism(PARALLELISM) + + .addSink(new SinkFunction<Integer>() { + HashSet<Integer> ids = new HashSet<>(); + @Override + public void invoke(Integer value) throws Exception { + ids.add(value); + + if (ids.size() == 100) { + throw new SuccessException(); + } + } + }).setParallelism(1); + } + + protected KafkaPartitioner<Row> createPartitioner() { + return new CustomPartitioner(); + } + + protected Properties createSinkProperties() { + return FlinkKafkaProducerBase.getPropertiesFromBrokerList(KafkaTestBase.brokerConnectionStrings); + } + + protected abstract KafkaTableSink createTableSink(); + + protected abstract DeserializationSchema<Row> createRowDeserializationSchema(); + + + public static class CustomPartitioner extends KafkaPartitioner<Row> implements Serializable { + @Override + public int partition(Row next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { + return 0; + } + } +}
