[FLINK-8014] [table] Add Kafka010JsonTableSink. - Refactor KafkaTableSink tests.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50fba9aa Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50fba9aa Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50fba9aa Branch: refs/heads/master Commit: 50fba9aa4e96632f7b32cf98d704683364196cbd Parents: fc3eebd Author: Fabian Hueske <fhue...@apache.org> Authored: Tue Nov 7 17:59:43 2017 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu Nov 16 11:32:12 2017 +0100 ---------------------------------------------------------------------- .../connectors/kafka/Kafka010JsonTableSink.java | 73 ++++++++++++++++++++ .../kafka/Kafka010JsonTableSinkTest.java | 53 ++++++++++++++ .../connectors/kafka/Kafka08JsonTableSink.java | 26 ++++++- .../kafka/Kafka08JsonTableSinkTest.java | 27 +++----- .../connectors/kafka/Kafka09JsonTableSink.java | 26 ++++++- .../kafka/Kafka09JsonTableSinkTest.java | 27 +++----- .../connectors/kafka/KafkaJsonTableSink.java | 5 +- .../connectors/kafka/KafkaTableSink.java | 10 ++- .../JsonRowSerializationSchema.java | 22 +++++- .../kafka/JsonRowSerializationSchemaTest.java | 46 ++++++++---- .../kafka/KafkaTableSinkTestBase.java | 30 ++++---- 11 files changed, 269 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java new file mode 100644 index 0000000..431ace0 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.types.Row; + +import java.util.Properties; + +/** + * Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format. + */ +public class Kafka010JsonTableSink extends KafkaJsonTableSink { + + /** + * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10 + * topic with fixed partition assignment. + * + * <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p> + * <ul> + * <li>If the number of Kafka partitions is less than the number of sink instances, different + * sink instances will write to the same partition.</li> + * <li>If the number of Kafka partitions is higher than the number of sink instance, some + * Kafka partitions won't receive data.</li> + * </ul> + * + * @param topic topic in Kafka to which table is written + * @param properties properties to connect to Kafka + */ + public Kafka010JsonTableSink(String topic, Properties properties) { + super(topic, properties, new FlinkFixedPartitioner<>()); + } + + /** + * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10 + * topic with custom partition assignment. + * + * @param topic topic in Kafka to which table is written + * @param properties properties to connect to Kafka + * @param partitioner Kafka partitioner + */ + public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) { + super(topic, properties, partitioner); + } + + @Override + protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) { + return new FlinkKafkaProducer010<>(topic, serializationSchema, properties, partitioner); + } + + @Override + protected Kafka010JsonTableSink createCopy() { + return new Kafka010JsonTableSink(topic, properties, partitioner); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java new file mode 100644 index 0000000..4d805d5 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; +import org.apache.flink.types.Row; + +import java.util.Properties; + +/** + * Tests for the {@link Kafka010JsonTableSink}. + */ +public class Kafka010JsonTableSinkTest extends KafkaTableSinkTestBase { + + @Override + protected KafkaTableSink createTableSink( + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner) { + + return new Kafka010JsonTableSink(topic, properties, partitioner); + } + + @Override + protected Class<? extends SerializationSchema<Row>> getSerializationSchemaClass() { + return JsonRowSerializationSchema.class; + } + + @Override + protected Class<? extends FlinkKafkaProducerBase> getProducerClass() { + return FlinkKafkaProducer010.class; + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java index a887048..39d5cb2 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; @@ -32,7 +33,27 @@ import java.util.Properties; public class Kafka08JsonTableSink extends KafkaJsonTableSink { /** - * Creates {@link KafkaTableSink} for Kafka 0.8. + * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8 + * topic with fixed partition assignment. + * + * <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p> + * <ul> + * <li>If the number of Kafka partitions is less than the number of sink instances, different + * sink instances will write to the same partition.</li> + * <li>If the number of Kafka partitions is higher than the number of sink instance, some + * Kafka partitions won't receive data.</li> + * </ul> + * + * @param topic topic in Kafka to which table is written + * @param properties properties to connect to Kafka + */ + public Kafka08JsonTableSink(String topic, Properties properties) { + super(topic, properties, new FlinkFixedPartitioner<>()); + } + + /** + * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8 + * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka @@ -43,7 +64,8 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink { } /** - * Creates {@link KafkaTableSink} for Kafka 0.8. + * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8 + * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java index 890fc3a..d7bb683 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -34,26 +34,19 @@ public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase { protected KafkaTableSink createTableSink( String topic, Properties properties, - FlinkKafkaPartitioner<Row> partitioner, - final FlinkKafkaProducerBase<Row> kafkaProducer) { - - return new Kafka08JsonTableSink(topic, properties, partitioner) { - @Override - protected FlinkKafkaProducerBase<Row> createKafkaProducer( - String topic, - Properties properties, - SerializationSchema<Row> serializationSchema, - FlinkKafkaPartitioner<Row> partitioner) { - - return kafkaProducer; - } - }; + FlinkKafkaPartitioner<Row> partitioner) { + + return new Kafka08JsonTableSink(topic, properties, partitioner); + } + + @Override + protected Class<? extends SerializationSchema<Row>> getSerializationSchemaClass() { + return JsonRowSerializationSchema.class; } @Override - @SuppressWarnings("unchecked") - protected SerializationSchema<Row> getSerializationSchema() { - return new JsonRowSerializationSchema(FIELD_NAMES); + protected Class<? extends FlinkKafkaProducerBase> getProducerClass() { + return FlinkKafkaProducer08.class; } } http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java index f65a02d..a4d2661 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; @@ -32,7 +33,27 @@ import java.util.Properties; public class Kafka09JsonTableSink extends KafkaJsonTableSink { /** - * Creates {@link KafkaTableSink} for Kafka 0.9 . + * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9 + * topic with fixed partition assignment. + * + * <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p> + * <ul> + * <li>If the number of Kafka partitions is less than the number of sink instances, different + * sink instances will write to the same partition.</li> + * <li>If the number of Kafka partitions is higher than the number of sink instance, some + * Kafka partitions won't receive data.</li> + * </ul> + * + * @param topic topic in Kafka to which table is written + * @param properties properties to connect to Kafka + */ + public Kafka09JsonTableSink(String topic, Properties properties) { + super(topic, properties, new FlinkFixedPartitioner<>()); + } + + /** + * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9 + * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka @@ -43,7 +64,8 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink { } /** - * Creates {@link KafkaTableSink} for Kafka 0.9 . + * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9 + * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java index c52b4ca..58f2b05 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -34,26 +34,19 @@ public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase { protected KafkaTableSink createTableSink( String topic, Properties properties, - FlinkKafkaPartitioner<Row> partitioner, - final FlinkKafkaProducerBase<Row> kafkaProducer) { - - return new Kafka09JsonTableSink(topic, properties, partitioner) { - @Override - protected FlinkKafkaProducerBase<Row> createKafkaProducer( - String topic, - Properties properties, - SerializationSchema<Row> serializationSchema, - FlinkKafkaPartitioner<Row> partitioner) { - - return kafkaProducer; - } - }; + FlinkKafkaPartitioner<Row> partitioner) { + + return new Kafka09JsonTableSink(topic, properties, partitioner); + } + + @Override + protected Class<? extends SerializationSchema<Row>> getSerializationSchemaClass() { + return JsonRowSerializationSchema.class; } @Override - @SuppressWarnings("unchecked") - protected SerializationSchema<Row> getSerializationSchema() { - return new JsonRowSerializationSchema(FIELD_NAMES); + protected Class<? extends FlinkKafkaProducerBase> getProducerClass() { + return FlinkKafkaProducer09.class; } } http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java index f354dad..6665dbd 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.apache.flink.types.Row; @@ -42,7 +43,7 @@ public abstract class KafkaJsonTableSink extends KafkaTableSink { } @Override - protected SerializationSchema<Row> createSerializationSchema(String[] fieldNames) { - return new JsonRowSerializationSchema(fieldNames); + protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema) { + return new JsonRowSerializationSchema(rowSchema); } } http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index cac71dc..f42827e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -77,10 +77,10 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { /** * Create serialization schema for converting table rows into bytes. * - * @param fieldNames Field names in table rows. + * @param rowSchema the schema of the row to serialize. * @return Instance of serialization schema */ - protected abstract SerializationSchema<Row> createSerializationSchema(String[] fieldNames); + protected abstract SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema); /** * Create a deep copy of this sink. @@ -92,6 +92,8 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { @Override public void emitDataStream(DataStream<Row> dataStream) { FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner); + // always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled. + kafkaProducer.setFlushOnCheckpoint(true); dataStream.addSink(kafkaProducer); } @@ -116,7 +118,9 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); Preconditions.checkArgument(fieldNames.length == fieldTypes.length, "Number of provided field names and types does not match."); - copy.serializationSchema = createSerializationSchema(fieldNames); + + RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames); + copy.serializationSchema = createSerializationSchema(rowSchema); return copy; } http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java index 5ece193..36d3137 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java @@ -18,6 +18,9 @@ package org.apache.flink.streaming.util.serialization; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; @@ -43,10 +46,23 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> { /** * Creates a JSON serialization schema for the given fields and types. * - * @param fieldNames Names of JSON fields to parse. + * @param rowSchema The schema of the rows to encode. */ - public JsonRowSerializationSchema(String[] fieldNames) { - this.fieldNames = Preconditions.checkNotNull(fieldNames); + public JsonRowSerializationSchema(RowTypeInfo rowSchema) { + + Preconditions.checkNotNull(rowSchema); + String[] fieldNames = rowSchema.getFieldNames(); + TypeInformation[] fieldTypes = rowSchema.getFieldTypes(); + + // check that no field is composite + for (int i = 0; i < fieldTypes.length; i++) { + if (fieldTypes[i] instanceof CompositeType) { + throw new IllegalArgumentException("JsonRowSerializationSchema cannot encode rows with nested schema, " + + "but field '" + fieldNames[i] + "' is nested: " + fieldTypes[i].toString()); + } + } + + this.fieldNames = fieldNames; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java index 43bde35..70140a6 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.apache.flink.table.api.Types; @@ -36,31 +37,34 @@ public class JsonRowSerializationSchemaTest { @Test public void testRowSerialization() throws IOException { - String[] fieldNames = new String[] {"f1", "f2", "f3"}; - TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] { Types.INT(), Types.BOOLEAN(), Types.STRING() }; + RowTypeInfo rowSchema = new RowTypeInfo( + new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()}, + new String[] {"f1", "f2", "f3"} + ); + Row row = new Row(3); row.setField(0, 1); row.setField(1, true); row.setField(2, "str"); - Row resultRow = serializeAndDeserialize(fieldNames, fieldTypes, row); + Row resultRow = serializeAndDeserialize(rowSchema, row); assertEqualRows(row, resultRow); } @Test public void testSerializationOfTwoRows() throws IOException { - String[] fieldNames = new String[] {"f1", "f2", "f3"}; - TypeInformation<Row> row = Types.ROW( - fieldNames, - new TypeInformation<?>[] { Types.INT(), Types.BOOLEAN(), Types.STRING() } + RowTypeInfo rowSchema = new RowTypeInfo( + new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()}, + new String[] {"f1", "f2", "f3"} ); + Row row1 = new Row(3); row1.setField(0, 1); row1.setField(1, true); row1.setField(2, "str"); - JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); - JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(row); + JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema); + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(rowSchema); byte[] bytes = serializationSchema.serialize(row1); assertEqualRows(row1, deserializationSchema.deserialize(bytes)); @@ -79,19 +83,33 @@ public class JsonRowSerializationSchemaTest { new JsonRowSerializationSchema(null); } + @Test(expected = IllegalArgumentException.class) + public void testRejectNestedSchema() { + RowTypeInfo rowSchema = new RowTypeInfo( + new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.ROW(Types.INT(), Types.DOUBLE())}, + new String[] {"f1", "f2", "f3"} + ); + + new JsonRowSerializationSchema(rowSchema); + } + @Test(expected = IllegalStateException.class) public void testSerializeRowWithInvalidNumberOfFields() { - String[] fieldNames = new String[] {"f1", "f2", "f3"}; + RowTypeInfo rowSchema = new RowTypeInfo( + new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()}, + new String[] {"f1", "f2", "f3"} + ); + Row row = new Row(1); row.setField(0, 1); - JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); + JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema); serializationSchema.serialize(row); } - private Row serializeAndDeserialize(String[] fieldNames, TypeInformation<?>[] fieldTypes, Row row) throws IOException { - JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); - JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(Types.ROW(fieldNames, fieldTypes)); + private Row serializeAndDeserialize(RowTypeInfo rowSchema, Row row) throws IOException { + JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema); + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(rowSchema); byte[] bytes = serializationSchema.serialize(row); return deserializationSchema.deserialize(bytes); http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java index 3138152..ac5259e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.table.api.Types; import org.apache.flink.types.Row; @@ -46,32 +45,27 @@ import static org.mockito.Mockito.verify; public abstract class KafkaTableSinkTestBase { private static final String TOPIC = "testTopic"; - protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"}; + private static final String[] FIELD_NAMES = new String[] {"field1", "field2"}; private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() }; private static final FlinkKafkaPartitioner<Row> PARTITIONER = new CustomPartitioner(); private static final Properties PROPERTIES = createSinkProperties(); - @SuppressWarnings("unchecked") - private final FlinkKafkaProducerBase<Row> producer = new FlinkKafkaProducerBase<Row>( - TOPIC, new KeyedSerializationSchemaWrapper(getSerializationSchema()), PROPERTIES, PARTITIONER) { - - @Override - protected void flush() {} - }; - @Test @SuppressWarnings("unchecked") + @Test public void testKafkaTableSink() throws Exception { DataStream dataStream = mock(DataStream.class); KafkaTableSink kafkaTableSink = spy(createTableSink()); kafkaTableSink.emitDataStream(dataStream); - verify(dataStream).addSink(eq(producer)); + // verify correct producer class + verify(dataStream).addSink(any(getProducerClass())); + // verify correctly configured producer verify(kafkaTableSink).createKafkaProducer( eq(TOPIC), eq(PROPERTIES), - any(getSerializationSchema().getClass()), + any(getSerializationSchemaClass()), eq(PARTITIONER)); } @@ -86,13 +80,17 @@ public abstract class KafkaTableSinkTestBase { assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType()); } - protected abstract KafkaTableSink createTableSink(String topic, Properties properties, - FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer); + protected abstract KafkaTableSink createTableSink( + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner); + + protected abstract Class<? extends SerializationSchema<Row>> getSerializationSchemaClass(); - protected abstract SerializationSchema<Row> getSerializationSchema(); + protected abstract Class<? extends FlinkKafkaProducerBase> getProducerClass(); private KafkaTableSink createTableSink() { - return createTableSink(TOPIC, PROPERTIES, PARTITIONER, producer); + return createTableSink(TOPIC, PROPERTIES, PARTITIONER); } private static Properties createSinkProperties() {