Repository: flink
Updated Branches:
  refs/heads/master efb40cfc5 -> 825ef3be3


[FLINK-3874] Rewrite Kafka JSON Table sink tests

This closes #2430.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/825ef3be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/825ef3be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/825ef3be

Branch: refs/heads/master
Commit: 825ef3be35ec9a85e800c5db5b8d3bbf5fa188a0
Parents: efb40cf
Author: Ivan Mushketyk <[email protected]>
Authored: Sat Aug 27 23:24:21 2016 +0100
Committer: twalthr <[email protected]>
Committed: Wed Oct 5 14:36:17 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/Kafka08JsonTableSink.java  |   1 +
 .../kafka/Kafka08JsonTableSinkITCase.java       |  40 -----
 .../kafka/Kafka08JsonTableSinkTest.java         |  48 ++++++
 .../kafka/Kafka09JsonTableSinkITCase.java       |  39 -----
 .../kafka/Kafka09JsonTableSinkTest.java         |  47 ++++++
 .../connectors/kafka/KafkaTableSink.java        |   1 +
 .../kafka/KafkaTableSinkTestBase.java           | 146 +++++++------------
 7 files changed, 153 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 5f869ec..b155576 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -27,6 +27,7 @@ import java.util.Properties;
  * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
  */
 public class Kafka08JsonTableSink extends KafkaJsonTableSink {
+
        /**
         * Creates {@link KafkaTableSink} for Kafka 0.8
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java
deleted file mode 100644
index f870adf..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-
-public class Kafka08JsonTableSinkITCase extends KafkaTableSinkTestBase {
-
-       @Override
-       protected KafkaTableSink createTableSink() {
-               Kafka08JsonTableSink sink = new Kafka08JsonTableSink(
-                       TOPIC,
-                       createSinkProperties(),
-                       createPartitioner());
-               return sink.configure(FIELD_NAMES, FIELD_TYPES);
-       }
-
-       protected DeserializationSchema<Row> createRowDeserializationSchema() {
-               return new JsonRowDeserializationSchema(
-                       FIELD_NAMES, FIELD_TYPES);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
new file mode 100644
index 0000000..b1e6db9
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
+
+       @Override
+       protected KafkaTableSink createTableSink(String topic, Properties 
properties, KafkaPartitioner<Row> partitioner,
+                               final FlinkKafkaProducerBase<Row> 
kafkaProducer) {
+
+               return new Kafka08JsonTableSink(topic, properties, partitioner) 
{
+                       @Override
+                       protected FlinkKafkaProducerBase<Row> 
createKafkaProducer(String topic, Properties properties,
+                                       SerializationSchema<Row> 
serializationSchema, KafkaPartitioner<Row> partitioner) {
+                               return kafkaProducer;
+                       }
+               };
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<SerializationSchema<Row>> getSerializationSchema() {
+               return (Class) JsonRowSerializationSchema.class;
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java
deleted file mode 100644
index 74415f8..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-
-public class Kafka09JsonTableSinkITCase extends KafkaTableSinkTestBase {
-
-       @Override
-       protected KafkaTableSink createTableSink() {
-               Kafka09JsonTableSink sink = new Kafka09JsonTableSink(
-                       TOPIC,
-                       createSinkProperties(),
-                       createPartitioner());
-               return sink.configure(FIELD_NAMES, FIELD_TYPES);
-       }
-
-       protected DeserializationSchema<Row> createRowDeserializationSchema() {
-               return new JsonRowDeserializationSchema(
-                       FIELD_NAMES, FIELD_TYPES);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
new file mode 100644
index 0000000..bfdcf68
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
+
+       @Override
+       protected KafkaTableSink createTableSink(String topic, Properties 
properties, KafkaPartitioner<Row> partitioner,
+                       final FlinkKafkaProducerBase<Row> kafkaProducer) {
+               return new Kafka09JsonTableSink(topic, properties, partitioner) 
{
+                       @Override
+                       protected FlinkKafkaProducerBase<Row> 
createKafkaProducer(String topic, Properties properties,
+                                       SerializationSchema<Row> 
serializationSchema, KafkaPartitioner<Row> partitioner) {
+                               return kafkaProducer;
+                       }
+               };
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<SerializationSchema<Row>> getSerializationSchema() {
+               return (Class) JsonRowSerializationSchema.class;
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 8f5e811..714d9cd 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -42,6 +42,7 @@ public abstract class KafkaTableSink implements 
StreamTableSink<Row> {
        protected final KafkaPartitioner<Row> partitioner;
        protected String[] fieldNames;
        protected TypeInformation[] fieldTypes;
+
        /**
         * Creates KafkaTableSink
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index 5e55b0a..e46ca08 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -17,123 +17,89 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.junit.Test;
 
 import java.io.Serializable;
-import java.util.HashSet;
 import java.util.Properties;
 
-import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
-public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements 
Serializable {
+public abstract class KafkaTableSinkTestBase implements Serializable {
 
-       protected final static String TOPIC = "customPartitioningTestTopic";
-       protected final static int PARALLELISM = 1;
-       protected final static String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
-       protected final static TypeInformation[] FIELD_TYPES = 
TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
+       private final static String TOPIC = "testTopic";
+       private final static String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
+       private final static TypeInformation[] FIELD_TYPES = 
TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
+
+       private final KafkaPartitioner<Row> partitioner = new 
CustomPartitioner();
+       private final Properties properties = createSinkProperties();
+       @SuppressWarnings("unchecked")
+       private final FlinkKafkaProducerBase<Row> kafkaProducer = 
mock(FlinkKafkaProducerBase.class);
 
        @Test
+       @SuppressWarnings("unchecked")
        public void testKafkaTableSink() throws Exception {
-               LOG.info("Starting 
KafkaTableSinkTestBase.testKafkaTableSink()");
-
-               createTestTopic(TOPIC, PARALLELISM, 1);
-               StreamExecutionEnvironment env = createEnvironment();
-
-               createProducingTopology(env);
-               createConsumingTopology(env);
+               DataStream dataStream = mock(DataStream.class);
+               KafkaTableSink kafkaTableSink = createTableSink();
+               kafkaTableSink.emitDataStream(dataStream);
 
-               tryExecute(env, "custom partitioning test");
-               deleteTestTopic(TOPIC);
-               LOG.info("Finished 
KafkaTableSinkTestBase.testKafkaTableSink()");
+               verify(dataStream).addSink(kafkaProducer);
        }
 
-       private StreamExecutionEnvironment createEnvironment() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env.setRestartStrategy(RestartStrategies.noRestart());
-               env.getConfig().disableSysoutLogging();
-               return env;
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testCreatedProducer() throws Exception {
+               DataStream dataStream = mock(DataStream.class);
+               KafkaTableSink kafkaTableSink = spy(createTableSink());
+               kafkaTableSink.emitDataStream(dataStream);
+
+               verify(kafkaTableSink).createKafkaProducer(
+                       eq(TOPIC),
+                       eq(properties),
+                       any(getSerializationSchema()),
+                       eq(partitioner));
        }
 
-       private void createProducingTopology(StreamExecutionEnvironment env) {
-               DataStream<Row> stream = env.addSource(new 
SourceFunction<Row>() {
-                       private boolean running = true;
-
-                       @Override
-                       public void run(SourceContext<Row> ctx) throws 
Exception {
-                               long cnt = 0;
-                               while (running) {
-                                       Row row = new Row(2);
-                                       row.setField(0, cnt);
-                                       row.setField(1, "kafka-" + cnt);
-                                       ctx.collect(row);
-                                       cnt++;
-                               }
-                       }
-
-                       @Override
-                       public void cancel() {
-                               running = false;
-                       }
-               })
-               .setParallelism(1);
-
-               KafkaTableSink kafkaTableSinkBase = createTableSink();
-
-               kafkaTableSinkBase.emitDataStream(stream);
+       @Test
+       public void testConfiguration() {
+               KafkaTableSink kafkaTableSink = createTableSink();
+               KafkaTableSink newKafkaTableSink = 
kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
+               assertNotSame(kafkaTableSink, newKafkaTableSink);
+
+               assertArrayEquals(FIELD_NAMES, 
newKafkaTableSink.getFieldNames());
+               assertArrayEquals(FIELD_TYPES, 
newKafkaTableSink.getFieldTypes());
+               assertEquals(new RowTypeInfo(FIELD_TYPES), 
newKafkaTableSink.getOutputType());
        }
 
-       private void createConsumingTopology(StreamExecutionEnvironment env) {
-               DeserializationSchema<Row> deserializationSchema = 
createRowDeserializationSchema();
-
-               FlinkKafkaConsumerBase<Row> source = 
kafkaServer.getConsumer(TOPIC, deserializationSchema, standardProps);
-
-               env.addSource(source).setParallelism(PARALLELISM)
-                       .map(new RichMapFunction<Row, Integer>() {
-                               @Override
-                               public Integer map(Row value) {
-                                       return (Integer) 
value.productElement(0);
-                               }
-                       }).setParallelism(PARALLELISM)
-
-                       .addSink(new SinkFunction<Integer>() {
-                               HashSet<Integer> ids = new HashSet<>();
-                               @Override
-                               public void invoke(Integer value) throws 
Exception {
-                                       ids.add(value);
-
-                                       if (ids.size() == 100) {
-                                               throw new SuccessException();
-                                       }
-                               }
-                       }).setParallelism(1);
-       }
+       protected abstract KafkaTableSink createTableSink(String topic, 
Properties properties,
+                       KafkaPartitioner<Row> partitioner, 
FlinkKafkaProducerBase<Row> kafkaProducer);
 
-       protected KafkaPartitioner<Row> createPartitioner() {
-               return new CustomPartitioner();
-       }
+       protected abstract Class<SerializationSchema<Row>> 
getSerializationSchema();
 
-       protected Properties createSinkProperties() {
-               return 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(KafkaTestBase.brokerConnectionStrings);
+       private KafkaTableSink createTableSink() {
+               return createTableSink(TOPIC, properties, partitioner, 
kafkaProducer);
        }
 
-       protected abstract KafkaTableSink createTableSink();
-
-       protected abstract DeserializationSchema<Row> 
createRowDeserializationSchema();
-
+       private static Properties createSinkProperties() {
+               Properties properties = new Properties();
+               properties.setProperty("testKey", "testValue");
+               return properties;
+       }
 
-       public static class CustomPartitioner extends KafkaPartitioner<Row> 
implements Serializable {
+       private static class CustomPartitioner extends KafkaPartitioner<Row> 
implements Serializable {
                @Override
                public int partition(Row next, byte[] serializedKey, byte[] 
serializedValue, int numPartitions) {
                        return 0;

Reply via email to