Repository: flink
Updated Branches:
  refs/heads/master 15df71ba9 -> 3d5bca0ab


[hotfix][kafka] Undo DataGenerators changes (use inline kafka producer again


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/744f8ebb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/744f8ebb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/744f8ebb

Branch: refs/heads/master
Commit: 744f8ebb66b2a7288942be139cd7a7e6d1170c80
Parents: 15df71b
Author: Robert Metzger <[email protected]>
Authored: Tue Oct 11 15:48:32 2016 +0200
Committer: Robert Metzger <[email protected]>
Committed: Wed Oct 12 14:03:14 2016 +0200

----------------------------------------------------------------------
 .../kafka/KafkaTestEnvironmentImpl.java         |   3 -
 .../connectors/kafka/Kafka09ITCase.java         |   9 -
 .../connectors/kafka/KafkaConsumerTestBase.java | 242 +------------------
 .../kafka/testutils/DataGenerators.java         | 165 ++++++-------
 4 files changed, 72 insertions(+), 347 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index af6d254..78fc1c6 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -115,9 +115,6 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                FlinkKafkaProducer010<T> prod = new 
FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
                prod.setFlushOnCheckpoint(true);
                return stream.addSink(prod);
-       /*      FlinkKafkaProducer010.FlinkKafkaProducer010Configuration<T> 
sink = FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, topic, 
serSchema, props, partitioner);
-               sink.setFlushOnCheckpoint(true);
-               return sink; */
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index fd167a0..b9ec18a 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -35,15 +35,6 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
                runSimpleConcurrentProducerConsumerTopology();
        }
 
-//     @Test(timeout = 60000)
-//     public void testPunctuatedExplicitWMConsumer() throws Exception {
-//             runExplicitPunctuatedWMgeneratingConsumerTest(false);
-//     }
-
-//     @Test(timeout = 60000)
-//     public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws 
Exception {
-//             runExplicitPunctuatedWMgeneratingConsumerTest(true);
-//     }
 
        @Test(timeout = 60000)
        public void testKeyValueSupport() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 3c967ba..0810a3e 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
@@ -31,7 +29,6 @@ import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -39,13 +36,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.table.StreamTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
@@ -68,7 +62,6 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -92,7 +85,6 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.testutils.junit.RetryOnException;
 import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.StringUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.junit.Assert;
@@ -116,7 +108,6 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
@@ -517,7 +508,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                // launch a producer thread
                DataGenerators.InfiniteStringsGenerator generator =
-                               new 
DataGenerators.InfiniteStringsGenerator(kafkaServer, topic, flinkPort);
+                               new 
DataGenerators.InfiniteStringsGenerator(kafkaServer, topic);
                generator.start();
 
                // launch a consumer asynchronously
@@ -571,7 +562,6 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                assertTrue(failueCause.getMessage().contains("Job was 
cancelled"));
 
                if (generator.isAlive()) {
-                       
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), 
"String generator");
                        generator.shutdown();
                        generator.join();
                }
@@ -1723,234 +1713,4 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        this.numElementsTotal = state;
                }
        }
-
-       /////////////                   Testing the Kafka consumer with embeded 
watermark generation functionality                      ///////////////
-
-//     @RetryOnException(times=0, 
exception=kafka.common.NotLeaderForPartitionException.class)
-//     public void runExplicitPunctuatedWMgeneratingConsumerTest(boolean 
emptyPartition) throws Exception {
-//
-//             final String topic1 = "wmExtractorTopic1_" + 
UUID.randomUUID().toString();
-//             final String topic2 = "wmExtractorTopic2_" + 
UUID.randomUUID().toString();
-//
-//             final Map<String, Boolean> topics = new HashMap<>();
-//             topics.put(topic1, false);
-//             topics.put(topic2, emptyPartition);
-//
-//             final int noOfTopcis = topics.size();
-//             final int partitionsPerTopic = 1;
-//             final int elementsPerPartition = 100 + 1;
-//
-//             final int totalElements = emptyPartition ?
-//                     partitionsPerTopic * elementsPerPartition :
-//                     noOfTopcis * partitionsPerTopic * elementsPerPartition;
-//
-//             createTestTopic(topic1, partitionsPerTopic, 1);
-//             createTestTopic(topic2, partitionsPerTopic, 1);
-//
-//             final StreamExecutionEnvironment env =
-//                     
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-//             env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-//             env.setParallelism(partitionsPerTopic);
-//             env.setRestartStrategy(RestartStrategies.noRestart()); // fail 
immediately
-//             env.getConfig().disableSysoutLogging();
-//
-//             TypeInformation<Tuple2<Long, Integer>> longIntType = 
TypeInfoParser.parse("Tuple2<Long, Integer>");
-//
-//             Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
-//             producerProperties.setProperty("retries", "0");
-//
-//             putDataInTopics(env, producerProperties, elementsPerPartition, 
topics, longIntType);
-//
-//             List<String> topicTitles = new ArrayList<>(topics.keySet());
-//             runPunctuatedComsumer(env, topicTitles, totalElements, 
longIntType);
-//
-//             executeAndCatchException(env, 
"runComsumerWithPunctuatedExplicitWMTest");
-//
-//             for(String topic: topicTitles) {
-//                     deleteTestTopic(topic);
-//             }
-//     }
-//
-//     private void executeAndCatchException(StreamExecutionEnvironment env, 
String execName) throws Exception {
-//             try {
-//                     tryExecutePropagateExceptions(env, execName);
-//             }
-//             catch (ProgramInvocationException | JobExecutionException e) {
-//                     // look for NotLeaderForPartitionException
-//                     Throwable cause = e.getCause();
-//
-//                     // search for nested SuccessExceptions
-//                     int depth = 0;
-//                     while (cause != null && depth++ < 20) {
-//                             if (cause instanceof 
kafka.common.NotLeaderForPartitionException) {
-//                                     throw (Exception) cause;
-//                             }
-//                             cause = cause.getCause();
-//                     }
-//                     throw e;
-//             }
-//     }
-//
-//     private void putDataInTopics(StreamExecutionEnvironment env,
-//                                                             Properties 
producerProperties,
-//                                                             final int 
elementsPerPartition,
-//                                                             Map<String, 
Boolean> topics,
-//                                                             
TypeInformation<Tuple2<Long, Integer>> outputTypeInfo) {
-//             if(topics.size() != 2) {
-//                     throw new RuntimeException("This method accepts two 
topics as arguments.");
-//             }
-//
-//             TypeInformationSerializationSchema<Tuple2<Long, Integer>> 
sinkSchema =
-//                     new 
TypeInformationSerializationSchema<>(outputTypeInfo, env.getConfig());
-//
-//             DataStream<Tuple2<Long, Integer>> stream = env
-//                     .addSource(new RichParallelSourceFunction<Tuple2<Long, 
Integer>>() {
-//                             private boolean running = true;
-//
-//                             @Override
-//                             public void run(SourceContext<Tuple2<Long, 
Integer>> ctx) throws InterruptedException {
-//                                     int topic = 0;
-//                                     int currentTs = 1;
-//
-//                                     while (running && currentTs < 
elementsPerPartition) {
-//                                             long timestamp = (currentTs % 
10 == 0) ? -1L : currentTs;
-//                                             ctx.collect(new Tuple2<Long, 
Integer>(timestamp, topic));
-//                                             currentTs++;
-//                                     }
-//
-//                                     Tuple2<Long, Integer> toWrite2 = new 
Tuple2<Long, Integer>(-1L, topic);
-//                                     ctx.collect(toWrite2);
-//                             }
-//
-//                             @Override
-//                             public void cancel() {
-//                             running = false;
-//                     }
-//                     }).setParallelism(1);
-//
-//             List<Map.Entry<String, Boolean>> topicsL = new 
ArrayList<>(topics.entrySet());
-//
-//             stream = stream.map(new MapFunction<Tuple2<Long,Integer>, 
Tuple2<Long,Integer>>() {
-//
-//                     @Override
-//                     public Tuple2<Long, Integer> map(Tuple2<Long, Integer> 
value) throws Exception {
-//                             return value;
-//                     }
-//             }).setParallelism(1);
-//             kafkaServer.produceIntoKafka(stream, topicsL.get(0).getKey(),
-//                     new KeyedSerializationSchemaWrapper<>(sinkSchema), 
producerProperties, null).setParallelism(1);
-//
-//             if(!topicsL.get(1).getValue()) {
-//                     stream.map(new MapFunction<Tuple2<Long,Integer>, 
Tuple2<Long,Integer>>() {
-//
-//                             @Override
-//                             public Tuple2<Long, Integer> map(Tuple2<Long, 
Integer> value) throws Exception {
-//                                     long timestamp = (value.f0 == -1) ? -1L 
: 1000 + value.f0;
-//                                     return new Tuple2<>(timestamp, 1);
-//                             }
-//                     
}).setParallelism(1).addSink(kafkaServer.produceIntoKafka(topicsL.get(1).getKey(),
-//                             new 
KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, 
null)).setParallelism(1);
-//             }
-//     }
-
-       private DataStreamSink<Tuple2<Long, Integer>> 
runPunctuatedComsumer(StreamExecutionEnvironment env,
-                                                                               
                                                                List<String> 
topics,
-                                                                               
                                                                final int 
totalElementsToExpect,
-                                                                               
                                                                
TypeInformation<Tuple2<Long, Integer>> inputTypeInfo) {
-
-               TypeInformationSerializationSchema<Tuple2<Long, Integer>> 
sourceSchema =
-                       new TypeInformationSerializationSchema<>(inputTypeInfo, 
env.getConfig());
-
-               Properties props = new Properties();
-               props.putAll(standardProps);
-               props.putAll(secureProps);
-               FlinkKafkaConsumerBase<Tuple2<Long, Integer>> source = 
kafkaServer
-                       .getConsumer(topics, sourceSchema, props)
-                       .assignTimestampsAndWatermarks(new 
TestPunctuatedTSExtractor());
-
-               DataStreamSource<Tuple2<Long, Integer>> consuming = 
env.setParallelism(1).addSource(source);
-
-               return consuming
-                       .transform("testingWatermarkOperator", inputTypeInfo, 
new WMTestingOperator())
-                       .addSink(new RichSinkFunction<Tuple2<Long, Integer>>() {
-
-                               private int elementCount = 0;
-
-                               @Override
-                               public void invoke(Tuple2<Long, Integer> value) 
throws Exception {
-                                       elementCount++;
-                                       if (elementCount == 
totalElementsToExpect) {
-                                               throw new SuccessException();
-                                       }
-                               }
-
-                               @Override
-                               public void close() throws Exception {
-                                       super.close();
-                               }
-                       });
-       }
-
-       /** An extractor that emits a Watermark whenever the timestamp <b>in 
the record</b> is equal to {@code -1}. */
-       private static class TestPunctuatedTSExtractor implements 
AssignerWithPunctuatedWatermarks<Tuple2<Long, Integer>> {
-
-               @Override
-               public Watermark checkAndGetNextWatermark(Tuple2<Long, Integer> 
lastElement, long extractedTimestamp) {
-                       return (lastElement.f0 == -1) ? new 
Watermark(extractedTimestamp) : null;
-               }
-
-               @Override
-               public long extractTimestamp(Tuple2<Long, Integer> element, 
long previousElementTimestamp) {
-                       return element.f0;
-               }
-       }
-
-       private static class WMTestingOperator extends 
AbstractStreamOperator<Tuple2<Long, Integer>> implements 
OneInputStreamOperator<Tuple2<Long, Integer>, Tuple2<Long, Integer>> {
-
-               private long lastReceivedWatermark = Long.MIN_VALUE;
-
-               private Map<Integer, Boolean> isEligible = new HashMap<>();
-               private Map<Integer, Long> perPartitionMaxTs = new HashMap<>();
-
-               WMTestingOperator() {
-                       isEligible = new HashMap<>();
-                       perPartitionMaxTs = new HashMap<>();
-               }
-
-               @Override
-               public void processElement(StreamRecord<Tuple2<Long, Integer>> 
element) throws Exception {
-                       int partition = element.getValue().f1;
-                       Long maxTs = perPartitionMaxTs.get(partition);
-                       if(maxTs == null || maxTs < element.getValue().f0) {
-                               perPartitionMaxTs.put(partition, 
element.getValue().f0);
-                               isEligible.put(partition, element.getValue().f0 
> lastReceivedWatermark);
-                       }
-                       output.collect(element);
-               }
-
-               @Override
-               public void processWatermark(Watermark mark) throws Exception {
-                       int partition = -1;
-                       long minTS = Long.MAX_VALUE;
-                       for (Integer part : perPartitionMaxTs.keySet()) {
-                               Long ts = perPartitionMaxTs.get(part);
-                               if (ts < minTS && isEligible.get(part)) {
-                                       partition = part;
-                                       minTS = ts;
-                                       lastReceivedWatermark = ts;
-                               }
-                       }
-                       isEligible.put(partition, false);
-
-                       assertEquals(minTS, mark.getTimestamp());
-                       output.emitWatermark(mark);
-               }
-
-               @Override
-               public void close() throws Exception {
-                       super.close();
-                       perPartitionMaxTs.clear();
-                       isEligible.clear();
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 3f035fd..ba75212 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -18,74 +18,35 @@
 
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
 import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import 
org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 
-import java.io.Serializable;
+import java.util.Collection;
 import java.util.Properties;
 import java.util.Random;
 
 @SuppressWarnings("serial")
 public class DataGenerators {
-       
-       public static void 
generateLongStringTupleSequence(StreamExecutionEnvironment env,
-                                                                               
                           KafkaTestEnvironment testServer, String topic,
-                                                                               
                           int numPartitions,
-                                                                               
                           final int from, final int to) throws Exception {
 
-               TypeInformation<Tuple2<Integer, Integer>> resultType = 
TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
-               env.setParallelism(numPartitions);
-               env.getConfig().disableSysoutLogging();
-               env.setRestartStrategy(RestartStrategies.noRestart());
-               
-               DataStream<Tuple2<Integer, Integer>> stream =env.addSource(
-                               new RichParallelSourceFunction<Tuple2<Integer, 
Integer>>() {
-
-                                       private volatile boolean running = true;
-
-                                       @Override
-                                       public void 
run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
-                                               int cnt = from;
-                                               int partition = 
getRuntimeContext().getIndexOfThisSubtask();
-
-                                               while (running && cnt <= to) {
-                                                       ctx.collect(new 
Tuple2<>(partition, cnt));
-                                                       cnt++;
-                                               }
-                                       }
-
-                                       @Override
-                                       public void cancel() {
-                                               running = false;
-                                       }
-                               });
-
-               testServer.produceIntoKafka(stream, topic,
-                               new KeyedSerializationSchemaWrapper<>(new 
TypeInformationSerializationSchema<>(resultType, env.getConfig())),
-                               
FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()),
-                               new Tuple2Partitioner(numPartitions)
-               );
-
-               env.execute("Data generator (Int, Int) stream to topic " + 
topic);
-       }
-
-       // 
------------------------------------------------------------------------
-       
        public static void 
generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
                                                                                
                                 KafkaTestEnvironment testServer, String topic,
                                                                                
                                 final int numPartitions,
@@ -105,9 +66,9 @@ public class DataGenerators {
                                                // create a sequence
                                                int[] elements = new 
int[numElements];
                                                for (int i = 0, val = 
getRuntimeContext().getIndexOfThisSubtask();
-                                                               i < numElements;
-                                                               i++, val += 
getRuntimeContext().getNumberOfParallelSubtasks()) {
-                                                       
+                                                        i < numElements;
+                                                        i++, val += 
getRuntimeContext().getNumberOfParallelSubtasks()) {
+
                                                        elements[i] = val;
                                                }
 
@@ -116,7 +77,7 @@ public class DataGenerators {
                                                        Random rnd = new 
Random();
                                                        for (int i = 0; i < 
elements.length; i++) {
                                                                int otherPos = 
rnd.nextInt(elements.length);
-                                                               
+
                                                                int tmp = 
elements[i];
                                                                elements[i] = 
elements[otherPos];
                                                                
elements[otherPos] = tmp;
@@ -142,7 +103,7 @@ public class DataGenerators {
                if(secureProps != null) {
                        props.putAll(testServer.getSecureProperties());
                }
-               
+
                stream = stream.rebalance();
                testServer.produceIntoKafka(stream, topic,
                                new KeyedSerializationSchemaWrapper<>(new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, 
env.getConfig())),
@@ -156,63 +117,55 @@ public class DataGenerators {
 
                env.execute("Scrambles int sequence generator");
        }
-       
+
        // 
------------------------------------------------------------------------
-       
-       public static class InfiniteStringsGenerator extends Thread implements 
Serializable {
 
-               private transient KafkaTestEnvironment server;
-               
-               private final String topic;
+       public static class InfiniteStringsGenerator extends Thread {
 
-               private final int flinkPort;
+               private final KafkaTestEnvironment server;
+
+               private final String topic;
 
                private volatile Throwable error;
-               
+
                private volatile boolean running = true;
 
-               
-               public InfiniteStringsGenerator(KafkaTestEnvironment server, 
String topic, int flinkPort) {
+
+               public InfiniteStringsGenerator(KafkaTestEnvironment server, 
String topic) {
                        this.server = server;
                        this.topic = topic;
-                       this.flinkPort = flinkPort;
                }
 
                @Override
                public void run() {
                        // we manually feed data into the Kafka sink
-                       FlinkKafkaProducerBase<String> producer = null;
+                       RichFunction producer = null;
                        try {
-                               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-                               DataStream<String> stream = env.addSource(new 
SourceFunction<String>() {
-                                       @Override
-                                       public void run(SourceContext<String> 
ctx) throws Exception {
-                                               final StringBuilder bld = new 
StringBuilder();
-                                               final Random rnd = new Random();
-                                               while (running) {
-                                                       bld.setLength(0);
-                                                       int len = 
rnd.nextInt(100) + 1;
-                                                       for (int i = 0; i < 
len; i++) {
-                                                               
bld.append((char) (rnd.nextInt(20) + 'a'));
-                                                       }
+                               Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString());
+                               producerProperties.setProperty("retries", "3");
+                               StreamTransformation<String> mockTransform = 
new MockStreamTransformation();
+                               DataStream<String> stream = new 
DataStream<>(new DummyStreamExecutionEnvironment(), mockTransform);
+                               DataStreamSink<String> sink = 
server.produceIntoKafka(stream, topic, new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
+                                               producerProperties, new 
FixedPartitioner<String>());
+                               StreamSink<String> producerOperator = 
sink.getTransformation().getOperator();
+                               producer = (RichFunction) 
producerOperator.getUserFunction();
+                               producer.setRuntimeContext(new 
MockRuntimeContext(1,0));
+                               producer.open(new Configuration());
 
-                                                       String next = 
bld.toString();
-                                                       ctx.collect(next);
-                                               }
-                                       }
+                               final StringBuilder bld = new StringBuilder();
+                               final Random rnd = new Random();
 
-                                       @Override
-                                       public void cancel() {
-                                               running = false;
+                               while (running) {
+                                       bld.setLength(0);
+
+                                       int len = rnd.nextInt(100) + 1;
+                                       for (int i = 0; i < len; i++) {
+                                               bld.append((char) 
(rnd.nextInt(20) + 'a') );
                                        }
-                               });
 
-                               Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString());
-                               producerProperties.setProperty("retries", "3");
-                               server.produceIntoKafka(stream, topic,
-                                               new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
-                                               producerProperties, new 
FixedPartitioner<String>());
-                               env.execute("String generator");
+                                       String next = bld.toString();
+                                       producerOperator.processElement(new 
StreamRecord<>(next));
+                               }
                        }
                        catch (Throwable t) {
                                this.error = t;
@@ -228,14 +181,38 @@ public class DataGenerators {
                                }
                        }
                }
-               
+
                public void shutdown() {
                        this.running = false;
                        this.interrupt();
                }
-               
+
                public Throwable getError() {
                        return this.error;
                }
+
+               private static class MockStreamTransformation extends 
StreamTransformation<String> {
+                       public MockStreamTransformation() {
+                               super("MockTransform", 
TypeInfoParser.<String>parse("String"), 1);
+                       }
+
+                       @Override
+                       public void setChainingStrategy(ChainingStrategy 
strategy) {
+
+                       }
+
+                       @Override
+                       public Collection<StreamTransformation<?>> 
getTransitivePredecessors() {
+                               return null;
+                       }
+               }
+
+               public static class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
+
+                       @Override
+                       public JobExecutionResult execute(String jobName) 
throws Exception {
+                               return null;
+                       }
+               }
        }
-}
+}
\ No newline at end of file

Reply via email to