Repository: flink Updated Branches: refs/heads/master 15df71ba9 -> 3d5bca0ab
[hotfix][kafka] Undo DataGenerators changes (use inline kafka producer again Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/744f8ebb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/744f8ebb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/744f8ebb Branch: refs/heads/master Commit: 744f8ebb66b2a7288942be139cd7a7e6d1170c80 Parents: 15df71b Author: Robert Metzger <[email protected]> Authored: Tue Oct 11 15:48:32 2016 +0200 Committer: Robert Metzger <[email protected]> Committed: Wed Oct 12 14:03:14 2016 +0200 ---------------------------------------------------------------------- .../kafka/KafkaTestEnvironmentImpl.java | 3 - .../connectors/kafka/Kafka09ITCase.java | 9 - .../connectors/kafka/KafkaConsumerTestBase.java | 242 +------------------ .../kafka/testutils/DataGenerators.java | 165 ++++++------- 4 files changed, 72 insertions(+), 347 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index af6d254..78fc1c6 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -115,9 +115,6 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner); prod.setFlushOnCheckpoint(true); return stream.addSink(prod); - /* FlinkKafkaProducer010.FlinkKafkaProducer010Configuration<T> sink = FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, topic, serSchema, props, partitioner); - sink.setFlushOnCheckpoint(true); - return sink; */ } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java index fd167a0..b9ec18a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java @@ -35,15 +35,6 @@ public class Kafka09ITCase extends KafkaConsumerTestBase { runSimpleConcurrentProducerConsumerTopology(); } -// @Test(timeout = 60000) -// public void testPunctuatedExplicitWMConsumer() throws Exception { -// runExplicitPunctuatedWMgeneratingConsumerTest(false); -// } - -// @Test(timeout = 60000) -// public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception { -// runExplicitPunctuatedWMgeneratingConsumerTest(true); -// } @Test(timeout = 60000) public void testKeyValueSupport() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 3c967ba..0810a3e 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -18,8 +18,6 @@ package org.apache.flink.streaming.connectors.kafka; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; @@ -31,7 +29,6 @@ import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -39,13 +36,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.table.StreamTableEnvironment; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.Table; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.DataInputView; @@ -68,7 +62,6 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -92,7 +85,6 @@ import org.apache.flink.test.util.SuccessException; import org.apache.flink.testutils.junit.RetryOnException; import org.apache.flink.testutils.junit.RetryRule; import org.apache.flink.util.Collector; -import org.apache.flink.util.StringUtils; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.errors.TimeoutException; import org.junit.Assert; @@ -116,7 +108,6 @@ import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.test.util.TestUtils.tryExecute; @@ -517,7 +508,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // launch a producer thread DataGenerators.InfiniteStringsGenerator generator = - new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic, flinkPort); + new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic); generator.start(); // launch a consumer asynchronously @@ -571,7 +562,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { assertTrue(failueCause.getMessage().contains("Job was cancelled")); if (generator.isAlive()) { - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "String generator"); generator.shutdown(); generator.join(); } @@ -1723,234 +1713,4 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { this.numElementsTotal = state; } } - - ///////////// Testing the Kafka consumer with embeded watermark generation functionality /////////////// - -// @RetryOnException(times=0, exception=kafka.common.NotLeaderForPartitionException.class) -// public void runExplicitPunctuatedWMgeneratingConsumerTest(boolean emptyPartition) throws Exception { -// -// final String topic1 = "wmExtractorTopic1_" + UUID.randomUUID().toString(); -// final String topic2 = "wmExtractorTopic2_" + UUID.randomUUID().toString(); -// -// final Map<String, Boolean> topics = new HashMap<>(); -// topics.put(topic1, false); -// topics.put(topic2, emptyPartition); -// -// final int noOfTopcis = topics.size(); -// final int partitionsPerTopic = 1; -// final int elementsPerPartition = 100 + 1; -// -// final int totalElements = emptyPartition ? -// partitionsPerTopic * elementsPerPartition : -// noOfTopcis * partitionsPerTopic * elementsPerPartition; -// -// createTestTopic(topic1, partitionsPerTopic, 1); -// createTestTopic(topic2, partitionsPerTopic, 1); -// -// final StreamExecutionEnvironment env = -// StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); -// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); -// env.setParallelism(partitionsPerTopic); -// env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately -// env.getConfig().disableSysoutLogging(); -// -// TypeInformation<Tuple2<Long, Integer>> longIntType = TypeInfoParser.parse("Tuple2<Long, Integer>"); -// -// Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); -// producerProperties.setProperty("retries", "0"); -// -// putDataInTopics(env, producerProperties, elementsPerPartition, topics, longIntType); -// -// List<String> topicTitles = new ArrayList<>(topics.keySet()); -// runPunctuatedComsumer(env, topicTitles, totalElements, longIntType); -// -// executeAndCatchException(env, "runComsumerWithPunctuatedExplicitWMTest"); -// -// for(String topic: topicTitles) { -// deleteTestTopic(topic); -// } -// } -// -// private void executeAndCatchException(StreamExecutionEnvironment env, String execName) throws Exception { -// try { -// tryExecutePropagateExceptions(env, execName); -// } -// catch (ProgramInvocationException | JobExecutionException e) { -// // look for NotLeaderForPartitionException -// Throwable cause = e.getCause(); -// -// // search for nested SuccessExceptions -// int depth = 0; -// while (cause != null && depth++ < 20) { -// if (cause instanceof kafka.common.NotLeaderForPartitionException) { -// throw (Exception) cause; -// } -// cause = cause.getCause(); -// } -// throw e; -// } -// } -// -// private void putDataInTopics(StreamExecutionEnvironment env, -// Properties producerProperties, -// final int elementsPerPartition, -// Map<String, Boolean> topics, -// TypeInformation<Tuple2<Long, Integer>> outputTypeInfo) { -// if(topics.size() != 2) { -// throw new RuntimeException("This method accepts two topics as arguments."); -// } -// -// TypeInformationSerializationSchema<Tuple2<Long, Integer>> sinkSchema = -// new TypeInformationSerializationSchema<>(outputTypeInfo, env.getConfig()); -// -// DataStream<Tuple2<Long, Integer>> stream = env -// .addSource(new RichParallelSourceFunction<Tuple2<Long, Integer>>() { -// private boolean running = true; -// -// @Override -// public void run(SourceContext<Tuple2<Long, Integer>> ctx) throws InterruptedException { -// int topic = 0; -// int currentTs = 1; -// -// while (running && currentTs < elementsPerPartition) { -// long timestamp = (currentTs % 10 == 0) ? -1L : currentTs; -// ctx.collect(new Tuple2<Long, Integer>(timestamp, topic)); -// currentTs++; -// } -// -// Tuple2<Long, Integer> toWrite2 = new Tuple2<Long, Integer>(-1L, topic); -// ctx.collect(toWrite2); -// } -// -// @Override -// public void cancel() { -// running = false; -// } -// }).setParallelism(1); -// -// List<Map.Entry<String, Boolean>> topicsL = new ArrayList<>(topics.entrySet()); -// -// stream = stream.map(new MapFunction<Tuple2<Long,Integer>, Tuple2<Long,Integer>>() { -// -// @Override -// public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception { -// return value; -// } -// }).setParallelism(1); -// kafkaServer.produceIntoKafka(stream, topicsL.get(0).getKey(), -// new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null).setParallelism(1); -// -// if(!topicsL.get(1).getValue()) { -// stream.map(new MapFunction<Tuple2<Long,Integer>, Tuple2<Long,Integer>>() { -// -// @Override -// public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception { -// long timestamp = (value.f0 == -1) ? -1L : 1000 + value.f0; -// return new Tuple2<>(timestamp, 1); -// } -// }).setParallelism(1).addSink(kafkaServer.produceIntoKafka(topicsL.get(1).getKey(), -// new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null)).setParallelism(1); -// } -// } - - private DataStreamSink<Tuple2<Long, Integer>> runPunctuatedComsumer(StreamExecutionEnvironment env, - List<String> topics, - final int totalElementsToExpect, - TypeInformation<Tuple2<Long, Integer>> inputTypeInfo) { - - TypeInformationSerializationSchema<Tuple2<Long, Integer>> sourceSchema = - new TypeInformationSerializationSchema<>(inputTypeInfo, env.getConfig()); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - FlinkKafkaConsumerBase<Tuple2<Long, Integer>> source = kafkaServer - .getConsumer(topics, sourceSchema, props) - .assignTimestampsAndWatermarks(new TestPunctuatedTSExtractor()); - - DataStreamSource<Tuple2<Long, Integer>> consuming = env.setParallelism(1).addSource(source); - - return consuming - .transform("testingWatermarkOperator", inputTypeInfo, new WMTestingOperator()) - .addSink(new RichSinkFunction<Tuple2<Long, Integer>>() { - - private int elementCount = 0; - - @Override - public void invoke(Tuple2<Long, Integer> value) throws Exception { - elementCount++; - if (elementCount == totalElementsToExpect) { - throw new SuccessException(); - } - } - - @Override - public void close() throws Exception { - super.close(); - } - }); - } - - /** An extractor that emits a Watermark whenever the timestamp <b>in the record</b> is equal to {@code -1}. */ - private static class TestPunctuatedTSExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<Long, Integer>> { - - @Override - public Watermark checkAndGetNextWatermark(Tuple2<Long, Integer> lastElement, long extractedTimestamp) { - return (lastElement.f0 == -1) ? new Watermark(extractedTimestamp) : null; - } - - @Override - public long extractTimestamp(Tuple2<Long, Integer> element, long previousElementTimestamp) { - return element.f0; - } - } - - private static class WMTestingOperator extends AbstractStreamOperator<Tuple2<Long, Integer>> implements OneInputStreamOperator<Tuple2<Long, Integer>, Tuple2<Long, Integer>> { - - private long lastReceivedWatermark = Long.MIN_VALUE; - - private Map<Integer, Boolean> isEligible = new HashMap<>(); - private Map<Integer, Long> perPartitionMaxTs = new HashMap<>(); - - WMTestingOperator() { - isEligible = new HashMap<>(); - perPartitionMaxTs = new HashMap<>(); - } - - @Override - public void processElement(StreamRecord<Tuple2<Long, Integer>> element) throws Exception { - int partition = element.getValue().f1; - Long maxTs = perPartitionMaxTs.get(partition); - if(maxTs == null || maxTs < element.getValue().f0) { - perPartitionMaxTs.put(partition, element.getValue().f0); - isEligible.put(partition, element.getValue().f0 > lastReceivedWatermark); - } - output.collect(element); - } - - @Override - public void processWatermark(Watermark mark) throws Exception { - int partition = -1; - long minTS = Long.MAX_VALUE; - for (Integer part : perPartitionMaxTs.keySet()) { - Long ts = perPartitionMaxTs.get(part); - if (ts < minTS && isEligible.get(part)) { - partition = part; - minTS = ts; - lastReceivedWatermark = ts; - } - } - isEligible.put(partition, false); - - assertEquals(minTS, mark.getTimestamp()); - output.emitWatermark(mark); - } - - @Override - public void close() throws Exception { - super.close(); - perPartitionMaxTs.clear(); - isEligible.clear(); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java index 3f035fd..ba75212 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java @@ -18,74 +18,35 @@ package org.apache.flink.streaming.connectors.kafka.testutils; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase; import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment; import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; -import java.io.Serializable; +import java.util.Collection; import java.util.Properties; import java.util.Random; @SuppressWarnings("serial") public class DataGenerators { - - public static void generateLongStringTupleSequence(StreamExecutionEnvironment env, - KafkaTestEnvironment testServer, String topic, - int numPartitions, - final int from, final int to) throws Exception { - TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>"); - - env.setParallelism(numPartitions); - env.getConfig().disableSysoutLogging(); - env.setRestartStrategy(RestartStrategies.noRestart()); - - DataStream<Tuple2<Integer, Integer>> stream =env.addSource( - new RichParallelSourceFunction<Tuple2<Integer, Integer>>() { - - private volatile boolean running = true; - - @Override - public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { - int cnt = from; - int partition = getRuntimeContext().getIndexOfThisSubtask(); - - while (running && cnt <= to) { - ctx.collect(new Tuple2<>(partition, cnt)); - cnt++; - } - } - - @Override - public void cancel() { - running = false; - } - }); - - testServer.produceIntoKafka(stream, topic, - new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(resultType, env.getConfig())), - FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()), - new Tuple2Partitioner(numPartitions) - ); - - env.execute("Data generator (Int, Int) stream to topic " + topic); - } - - // ------------------------------------------------------------------------ - public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env, KafkaTestEnvironment testServer, String topic, final int numPartitions, @@ -105,9 +66,9 @@ public class DataGenerators { // create a sequence int[] elements = new int[numElements]; for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask(); - i < numElements; - i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) { - + i < numElements; + i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) { + elements[i] = val; } @@ -116,7 +77,7 @@ public class DataGenerators { Random rnd = new Random(); for (int i = 0; i < elements.length; i++) { int otherPos = rnd.nextInt(elements.length); - + int tmp = elements[i]; elements[i] = elements[otherPos]; elements[otherPos] = tmp; @@ -142,7 +103,7 @@ public class DataGenerators { if(secureProps != null) { props.putAll(testServer.getSecureProperties()); } - + stream = stream.rebalance(); testServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())), @@ -156,63 +117,55 @@ public class DataGenerators { env.execute("Scrambles int sequence generator"); } - + // ------------------------------------------------------------------------ - - public static class InfiniteStringsGenerator extends Thread implements Serializable { - private transient KafkaTestEnvironment server; - - private final String topic; + public static class InfiniteStringsGenerator extends Thread { - private final int flinkPort; + private final KafkaTestEnvironment server; + + private final String topic; private volatile Throwable error; - + private volatile boolean running = true; - - public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic, int flinkPort) { + + public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic) { this.server = server; this.topic = topic; - this.flinkPort = flinkPort; } @Override public void run() { // we manually feed data into the Kafka sink - FlinkKafkaProducerBase<String> producer = null; + RichFunction producer = null; try { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - DataStream<String> stream = env.addSource(new SourceFunction<String>() { - @Override - public void run(SourceContext<String> ctx) throws Exception { - final StringBuilder bld = new StringBuilder(); - final Random rnd = new Random(); - while (running) { - bld.setLength(0); - int len = rnd.nextInt(100) + 1; - for (int i = 0; i < len; i++) { - bld.append((char) (rnd.nextInt(20) + 'a')); - } + Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString()); + producerProperties.setProperty("retries", "3"); + StreamTransformation<String> mockTransform = new MockStreamTransformation(); + DataStream<String> stream = new DataStream<>(new DummyStreamExecutionEnvironment(), mockTransform); + DataStreamSink<String> sink = server.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), + producerProperties, new FixedPartitioner<String>()); + StreamSink<String> producerOperator = sink.getTransformation().getOperator(); + producer = (RichFunction) producerOperator.getUserFunction(); + producer.setRuntimeContext(new MockRuntimeContext(1,0)); + producer.open(new Configuration()); - String next = bld.toString(); - ctx.collect(next); - } - } + final StringBuilder bld = new StringBuilder(); + final Random rnd = new Random(); - @Override - public void cancel() { - running = false; + while (running) { + bld.setLength(0); + + int len = rnd.nextInt(100) + 1; + for (int i = 0; i < len; i++) { + bld.append((char) (rnd.nextInt(20) + 'a') ); } - }); - Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString()); - producerProperties.setProperty("retries", "3"); - server.produceIntoKafka(stream, topic, - new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), - producerProperties, new FixedPartitioner<String>()); - env.execute("String generator"); + String next = bld.toString(); + producerOperator.processElement(new StreamRecord<>(next)); + } } catch (Throwable t) { this.error = t; @@ -228,14 +181,38 @@ public class DataGenerators { } } } - + public void shutdown() { this.running = false; this.interrupt(); } - + public Throwable getError() { return this.error; } + + private static class MockStreamTransformation extends StreamTransformation<String> { + public MockStreamTransformation() { + super("MockTransform", TypeInfoParser.<String>parse("String"), 1); + } + + @Override + public void setChainingStrategy(ChainingStrategy strategy) { + + } + + @Override + public Collection<StreamTransformation<?>> getTransitivePredecessors() { + return null; + } + } + + public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { + + @Override + public JobExecutionResult execute(String jobName) throws Exception { + return null; + } + } } -} +} \ No newline at end of file
