http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java new file mode 100644 index 0000000..1bd01a2 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -0,0 +1,1371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.MessageAndMetadata; +import kafka.server.KafkaServer; + +import org.apache.commons.collections.map.LinkedMap; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators; +import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink; +import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper; +import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils; +import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper; +import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper; +import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner; +import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; +import org.apache.flink.test.util.SuccessException; +import org.apache.flink.testutils.junit.RetryOnException; +import org.apache.flink.testutils.junit.RetryRule; +import org.apache.flink.util.Collector; + +import org.apache.flink.util.StringUtils; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.junit.Assert; + +import org.junit.Rule; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.test.util.TestUtils.tryExecute; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + + +@SuppressWarnings("serial") +public abstract class KafkaConsumerTestBase extends KafkaTestBase { + + @Rule + public RetryRule retryRule = new RetryRule(); + + + // ------------------------------------------------------------------------ + // Suite of Tests + // + // The tests here are all not activated (by an @Test tag), but need + // to be invoked from the extending classes. That way, the classes can + // select which tests to run. + // ------------------------------------------------------------------------ + + + /** + * Test that ensures the KafkaConsumer is properly failing if the topic doesnt exist + * and a wrong broker was specified + * + * @throws Exception + */ + public void runFailOnNoBrokerTest() throws Exception { + try { + Properties properties = new Properties(); + + StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + see.getConfig().disableSysoutLogging(); + see.setNumberOfExecutionRetries(0); + see.setParallelism(1); + + // use wrong ports for the consumers + properties.setProperty("bootstrap.servers", "localhost:80"); + properties.setProperty("zookeeper.connect", "localhost:80"); + properties.setProperty("group.id", "test"); + FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer("doesntexist", new SimpleStringSchema(), properties); + DataStream<String> stream = see.addSource(source); + stream.print(); + see.execute("No broker test"); + } catch(RuntimeException re){ + Assert.assertTrue("Wrong RuntimeException thrown: " + StringUtils.stringifyException(re), + re.getClass().equals(RuntimeException.class) && + re.getMessage().contains("Unable to retrieve any partitions for the requested topics [doesntexist]")); + } + } + /** + * Test that validates that checkpointing and checkpoint notification works properly + */ + public void runCheckpointingTest() throws Exception { + createTestTopic("testCheckpointing", 1, 1); + + FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer("testCheckpointing", new SimpleStringSchema(), standardProps); + Field pendingCheckpointsField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints"); + pendingCheckpointsField.setAccessible(true); + LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source); + + Assert.assertEquals(0, pendingCheckpoints.size()); + source.setRuntimeContext(new MockRuntimeContext(1, 0)); + + final HashMap<KafkaTopicPartition, Long> initialOffsets = new HashMap<>(); + initialOffsets.put(new KafkaTopicPartition("testCheckpointing", 0), 1337L); + + // first restore + source.restoreState(initialOffsets); + + // then open + source.open(new Configuration()); + HashMap<KafkaTopicPartition, Long> state1 = source.snapshotState(1, 15); + + assertEquals(initialOffsets, state1); + + HashMap<KafkaTopicPartition, Long> state2 = source.snapshotState(2, 30); + Assert.assertEquals(initialOffsets, state2); + + Assert.assertEquals(2, pendingCheckpoints.size()); + + source.notifyCheckpointComplete(1); + Assert.assertEquals(1, pendingCheckpoints.size()); + + source.notifyCheckpointComplete(2); + Assert.assertEquals(0, pendingCheckpoints.size()); + + source.notifyCheckpointComplete(666); // invalid checkpoint + Assert.assertEquals(0, pendingCheckpoints.size()); + + // create 500 snapshots + for (int i = 100; i < 600; i++) { + source.snapshotState(i, 15 * i); + } + Assert.assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size()); + + // commit only the second last + source.notifyCheckpointComplete(598); + Assert.assertEquals(1, pendingCheckpoints.size()); + + // access invalid checkpoint + source.notifyCheckpointComplete(590); + + // and the last + source.notifyCheckpointComplete(599); + Assert.assertEquals(0, pendingCheckpoints.size()); + + source.close(); + + deleteTestTopic("testCheckpointing"); + } + + + + /** + * Ensure Kafka is working on both producer and consumer side. + * This executes a job that contains two Flink pipelines. + * + * <pre> + * (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink) + * </pre> + * + * We need to externally retry this test. We cannot let Flink's retry mechanism do it, because the Kafka producer + * does not guarantee exactly-once output. Hence a recovery would introduce duplicates that + * cause the test to fail. + * + * This test also ensures that FLINK-3156 doesn't happen again: + * + * The following situation caused a NPE in the FlinkKafkaConsumer + * + * topic-1 <-- elements are only produced into topic1. + * topic-2 + * + * Therefore, this test is consuming as well from an empty topic. + * + */ + @RetryOnException(times=2, exception=kafka.common.NotLeaderForPartitionException.class) + public void runSimpleConcurrentProducerConsumerTopology() throws Exception { + final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString(); + final String additionalEmptyTopic = "additionalEmptyTopic_" + UUID.randomUUID().toString(); + + final int parallelism = 3; + final int elementsPerPartition = 100; + final int totalElements = parallelism * elementsPerPartition; + + createTestTopic(topic, parallelism, 2); + createTestTopic(additionalEmptyTopic, parallelism, 1); // create an empty topic which will remain empty all the time + + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setParallelism(parallelism); + env.enableCheckpointing(500); + env.setNumberOfExecutionRetries(0); // fail immediately + env.getConfig().disableSysoutLogging(); + + TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>"); + + TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema = + new TypeInformationSerializationSchema<>(longStringType, env.getConfig()); + + TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema = + new TypeInformationSerializationSchema<>(longStringType, env.getConfig()); + + // ----------- add producer dataflow ---------- + + DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() { + + private boolean running = true; + + @Override + public void run(SourceContext<Tuple2<Long, String>> ctx) throws InterruptedException { + int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition; + int limit = cnt + elementsPerPartition; + + + while (running && cnt < limit) { + ctx.collect(new Tuple2<>(1000L + cnt, "kafka-" + cnt)); + cnt++; + // we delay data generation a bit so that we are sure that some checkpoints are + // triggered (for FLINK-3156) + Thread.sleep(50); + } + } + + @Override + public void cancel() { + running = false; + } + }); + stream.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<Tuple2<Long, String>>(sinkSchema), FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings), null)); + + // ----------- add consumer dataflow ---------- + + List<String> topics = new ArrayList<>(); + topics.add(topic); + topics.add(additionalEmptyTopic); + FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topics, sourceSchema, standardProps); + + DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism); + + consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() { + + private int elCnt = 0; + private BitSet validator = new BitSet(totalElements); + + @Override + public void invoke(Tuple2<Long, String> value) throws Exception { + String[] sp = value.f1.split("-"); + int v = Integer.parseInt(sp[1]); + + assertEquals(value.f0 - 1000, (long) v); + + assertFalse("Received tuple twice", validator.get(v)); + validator.set(v); + elCnt++; + + if (elCnt == totalElements) { + // check if everything in the bitset is set to true + int nc; + if ((nc = validator.nextClearBit(0)) != totalElements) { + fail("The bitset was not set to 1 on all elements. Next clear:" + + nc + " Set: " + validator); + } + throw new SuccessException(); + } + } + + @Override + public void close() throws Exception { + super.close(); + } + }).setParallelism(1); + + try { + tryExecutePropagateExceptions(env, "runSimpleConcurrentProducerConsumerTopology"); + } + catch (ProgramInvocationException | JobExecutionException e) { + // look for NotLeaderForPartitionException + Throwable cause = e.getCause(); + + // search for nested SuccessExceptions + int depth = 0; + while (cause != null && depth++ < 20) { + if (cause instanceof kafka.common.NotLeaderForPartitionException) { + throw (Exception) cause; + } + cause = cause.getCause(); + } + throw e; + } + + deleteTestTopic(topic); + } + + + /** + * Tests the proper consumption when having a 1:1 correspondence between kafka partitions and + * Flink sources. + */ + public void runOneToOneExactlyOnceTest() throws Exception { + + final String topic = "oneToOneTopic"; + final int parallelism = 5; + final int numElementsPerPartition = 1000; + final int totalElements = parallelism * numElementsPerPartition; + final int failAfterElements = numElementsPerPartition / 3; + + createTestTopic(topic, parallelism, 1); + + DataGenerators.generateRandomizedIntegerSequence( + StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort), + kafkaServer, + topic, parallelism, numElementsPerPartition, true); + + // run the topology that fails and recovers + + DeserializationSchema<Integer> schema = + new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.enableCheckpointing(500); + env.setParallelism(parallelism); + env.setNumberOfExecutionRetries(3); + env.getConfig().disableSysoutLogging(); + + FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps); + + env + .addSource(kafkaSource) + .map(new PartitionValidatingMapper(parallelism, 1)) + .map(new FailingIdentityMapper<Integer>(failAfterElements)) + .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); + + FailingIdentityMapper.failedBefore = false; + tryExecute(env, "One-to-one exactly once test"); + + deleteTestTopic(topic); + } + + /** + * Tests the proper consumption when having fewer Flink sources than Kafka partitions, so + * one Flink source will read multiple Kafka partitions. + */ + public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception { + final String topic = "oneToManyTopic"; + final int numPartitions = 5; + final int numElementsPerPartition = 1000; + final int totalElements = numPartitions * numElementsPerPartition; + final int failAfterElements = numElementsPerPartition / 3; + + final int parallelism = 2; + + createTestTopic(topic, numPartitions, 1); + + DataGenerators.generateRandomizedIntegerSequence( + StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort), + kafkaServer, + topic, numPartitions, numElementsPerPartition, true); + + // run the topology that fails and recovers + + DeserializationSchema<Integer> schema = + new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.enableCheckpointing(500); + env.setParallelism(parallelism); + env.setNumberOfExecutionRetries(3); + env.getConfig().disableSysoutLogging(); + + FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps); + + env + .addSource(kafkaSource) + .map(new PartitionValidatingMapper(numPartitions, 3)) + .map(new FailingIdentityMapper<Integer>(failAfterElements)) + .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); + + FailingIdentityMapper.failedBefore = false; + tryExecute(env, "One-source-multi-partitions exactly once test"); + + deleteTestTopic(topic); + } + + /** + * Tests the proper consumption when having more Flink sources than Kafka partitions, which means + * that some Flink sources will read no partitions. + */ + public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception { + final String topic = "manyToOneTopic"; + final int numPartitions = 5; + final int numElementsPerPartition = 1000; + final int totalElements = numPartitions * numElementsPerPartition; + final int failAfterElements = numElementsPerPartition / 3; + + final int parallelism = 8; + + createTestTopic(topic, numPartitions, 1); + + DataGenerators.generateRandomizedIntegerSequence( + StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort), + kafkaServer, + topic, numPartitions, numElementsPerPartition, true); + + // run the topology that fails and recovers + + DeserializationSchema<Integer> schema = + new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.enableCheckpointing(500); + env.setParallelism(parallelism); + env.setNumberOfExecutionRetries(3); + env.getConfig().disableSysoutLogging(); + env.setBufferTimeout(0); + + FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps); + + env + .addSource(kafkaSource) + .map(new PartitionValidatingMapper(numPartitions, 1)) + .map(new FailingIdentityMapper<Integer>(failAfterElements)) + .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); + + FailingIdentityMapper.failedBefore = false; + tryExecute(env, "multi-source-one-partitions exactly once test"); + + + deleteTestTopic(topic); + } + + + /** + * Tests that the source can be properly canceled when reading full partitions. + */ + public void runCancelingOnFullInputTest() throws Exception { + final String topic = "cancelingOnFullTopic"; + + final int parallelism = 3; + createTestTopic(topic, parallelism, 1); + + // launch a producer thread + DataGenerators.InfiniteStringsGenerator generator = + new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic); + generator.start(); + + // launch a consumer asynchronously + + final AtomicReference<Throwable> jobError = new AtomicReference<>(); + + final Runnable jobRunner = new Runnable() { + @Override + public void run() { + try { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setParallelism(parallelism); + env.enableCheckpointing(100); + env.getConfig().disableSysoutLogging(); + + FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), standardProps); + + env.addSource(source).addSink(new DiscardingSink<String>()); + + env.execute(); + } + catch (Throwable t) { + jobError.set(t); + } + } + }; + + Thread runnerThread = new Thread(jobRunner, "program runner thread"); + runnerThread.start(); + + // wait a bit before canceling + Thread.sleep(8000); + + Throwable failueCause = jobError.get(); + if(failueCause != null) { + failueCause.printStackTrace(); + Assert.fail("Test failed prematurely with: " + failueCause.getMessage()); + } + + // cancel + JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + + // wait for the program to be done and validate that we failed with the right exception + runnerThread.join(); + + failueCause = jobError.get(); + assertNotNull("program did not fail properly due to canceling", failueCause); + assertTrue(failueCause.getMessage().contains("Job was cancelled")); + + if (generator.isAlive()) { + generator.shutdown(); + generator.join(); + } + else { + Throwable t = generator.getError(); + if (t != null) { + t.printStackTrace(); + fail("Generator failed: " + t.getMessage()); + } else { + fail("Generator failed with no exception"); + } + } + + deleteTestTopic(topic); + } + + /** + * Tests that the source can be properly canceled when reading empty partitions. + */ + public void runCancelingOnEmptyInputTest() throws Exception { + final String topic = "cancelingOnEmptyInputTopic"; + + final int parallelism = 3; + createTestTopic(topic, parallelism, 1); + + final AtomicReference<Throwable> error = new AtomicReference<>(); + + final Runnable jobRunner = new Runnable() { + @Override + public void run() { + try { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setParallelism(parallelism); + env.enableCheckpointing(100); + env.getConfig().disableSysoutLogging(); + + FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), standardProps); + + env.addSource(source).addSink(new DiscardingSink<String>()); + + env.execute(); + } + catch (Throwable t) { + LOG.error("Job Runner failed with exception", t); + error.set(t); + } + } + }; + + Thread runnerThread = new Thread(jobRunner, "program runner thread"); + runnerThread.start(); + + // wait a bit before canceling + Thread.sleep(8000); + + Throwable failueCause = error.get(); + if(failueCause != null) { + failueCause.printStackTrace(); + Assert.fail("Test failed prematurely with: " + failueCause.getMessage()); + } + // cancel + JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + + // wait for the program to be done and validate that we failed with the right exception + runnerThread.join(); + + failueCause = error.get(); + assertNotNull("program did not fail properly due to canceling", failueCause); + assertTrue(failueCause.getMessage().contains("Job was cancelled")); + + deleteTestTopic(topic); + } + + /** + * Tests that the source can be properly canceled when reading full partitions. + */ + public void runFailOnDeployTest() throws Exception { + final String topic = "failOnDeployTopic"; + + createTestTopic(topic, 2, 1); + + DeserializationSchema<Integer> schema = + new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setParallelism(12); // needs to be more that the mini cluster has slots + env.getConfig().disableSysoutLogging(); + + FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps); + + env + .addSource(kafkaSource) + .addSink(new DiscardingSink<Integer>()); + + try { + env.execute(); + fail("this test should fail with an exception"); + } + catch (ProgramInvocationException e) { + + // validate that we failed due to a NoResourceAvailableException + Throwable cause = e.getCause(); + int depth = 0; + boolean foundResourceException = false; + + while (cause != null && depth++ < 20) { + if (cause instanceof NoResourceAvailableException) { + foundResourceException = true; + break; + } + cause = cause.getCause(); + } + + assertTrue("Wrong exception", foundResourceException); + } + + deleteTestTopic(topic); + } + + public void runConsumeMultipleTopics() throws java.lang.Exception { + final int NUM_TOPICS = 5; + final int NUM_ELEMENTS = 20; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + + // create topics with content + final List<String> topics = new ArrayList<>(); + for (int i = 0; i < NUM_TOPICS; i++) { + final String topic = "topic-" + i; + topics.add(topic); + // create topic + createTestTopic(topic, i + 1 /*partitions*/, 1); + + // write something + writeSequence(env, topic, NUM_ELEMENTS, i + 1); + } + + KeyedDeserializationSchema<Tuple3<Integer, Integer, String>> readSchema = new Tuple2WithTopicDeserializationSchema(env.getConfig()); + DataStreamSource<Tuple3<Integer, Integer, String>> stream = env.addSource(kafkaServer.getConsumer(topics, readSchema, standardProps)); + + stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() { + Map<String, Integer> countPerTopic = new HashMap<>(NUM_TOPICS); + @Override + public void flatMap(Tuple3<Integer, Integer, String> value, Collector<Integer> out) throws Exception { + Integer count = countPerTopic.get(value.f2); + if (count == null) { + count = 1; + } else { + count++; + } + countPerTopic.put(value.f2, count); + + // check map: + for (Map.Entry<String, Integer> el: countPerTopic.entrySet()) { + if (el.getValue() < NUM_ELEMENTS) { + break; // not enough yet + } + if (el.getValue() > NUM_ELEMENTS) { + throw new RuntimeException("There is a failure in the test. I've read " + + el.getValue() + " from topic " + el.getKey()); + } + } + // we've seen messages from all topics + throw new SuccessException(); + } + }).setParallelism(1); + + tryExecute(env, "Count elements from the topics"); + + + // delete all topics again + for (int i = 0; i < NUM_TOPICS; i++) { + final String topic = "topic-" + i; + deleteTestTopic(topic); + } + } + + private static class Tuple2WithTopicDeserializationSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>> { + + TypeSerializer ts; + public Tuple2WithTopicDeserializationSchema(ExecutionConfig ec) { + ts = TypeInfoParser.parse("Tuple2<Integer, Integer>").createSerializer(ec); + } + + @Override + public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + Tuple2<Integer, Integer> t2 = (Tuple2<Integer, Integer>) ts.deserialize(new ByteArrayInputView(message)); + return new Tuple3<>(t2.f0, t2.f1, topic); + } + + @Override + public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) { + return false; + } + + @Override + public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() { + return TypeInfoParser.parse("Tuple3<Integer, Integer, String>"); + } + } + + /** + * Test Flink's Kafka integration also with very big records (30MB) + * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message + * + */ + public void runBigRecordTestTopology() throws Exception { + + final String topic = "bigRecordTestTopic"; + final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space + + createTestTopic(topic, parallelism, 1); + + final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>"); + + final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema = + new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig()); + + final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> deserSchema = + new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig()); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setNumberOfExecutionRetries(0); + env.getConfig().disableSysoutLogging(); + env.enableCheckpointing(100); + env.setParallelism(parallelism); + + // add consuming topology: + Properties consumerProps = new Properties(); + consumerProps.putAll(standardProps); + consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 40)); + consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 40)); // for the new fetcher + consumerProps.setProperty("queued.max.message.chunks", "1"); + + FlinkKafkaConsumerBase<Tuple2<Long, byte[]>> source = kafkaServer.getConsumer(topic, serSchema, consumerProps); + DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source); + + consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() { + + private int elCnt = 0; + + @Override + public void invoke(Tuple2<Long, byte[]> value) throws Exception { + elCnt++; + if (value.f0 == -1) { + // we should have seen 11 elements now. + if (elCnt == 11) { + throw new SuccessException(); + } else { + throw new RuntimeException("There have been "+elCnt+" elements"); + } + } + if (elCnt > 10) { + throw new RuntimeException("More than 10 elements seen: "+elCnt); + } + } + }); + + // add producing topology + Properties producerProps = new Properties(); + producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 30)); + producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings); + + DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() { + + private boolean running; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + running = true; + } + + @Override + public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception { + Random rnd = new Random(); + long cnt = 0; + int fifteenMb = 1024 * 1024 * 15; + + while (running) { + byte[] wl = new byte[fifteenMb + rnd.nextInt(fifteenMb)]; + ctx.collect(new Tuple2<>(cnt++, wl)); + + Thread.sleep(100); + + if (cnt == 10) { + // signal end + ctx.collect(new Tuple2<>(-1L, new byte[]{1})); + break; + } + } + } + + @Override + public void cancel() { + running = false; + } + }); + + stream.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(serSchema), producerProps, null)); + + tryExecute(env, "big topology test"); + deleteTestTopic(topic); + } + + + public void runBrokerFailureTest() throws Exception { + final String topic = "brokerFailureTestTopic"; + + final int parallelism = 2; + final int numElementsPerPartition = 1000; + final int totalElements = parallelism * numElementsPerPartition; + final int failAfterElements = numElementsPerPartition / 3; + + + createTestTopic(topic, parallelism, 2); + + DataGenerators.generateRandomizedIntegerSequence( + StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort), + kafkaServer, + topic, parallelism, numElementsPerPartition, true); + + // find leader to shut down + int leaderId = kafkaServer.getLeaderToShutDown(topic); + + LOG.info("Leader to shutdown {}", leaderId); + + + // run the topology that fails and recovers + + DeserializationSchema<Integer> schema = + new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setParallelism(parallelism); + env.enableCheckpointing(500); + env.setNumberOfExecutionRetries(3); + env.getConfig().disableSysoutLogging(); + + + FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps); + + env + .addSource(kafkaSource) + .map(new PartitionValidatingMapper(parallelism, 1)) + .map(new BrokerKillingMapper<Integer>(leaderId, failAfterElements)) + .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); + + BrokerKillingMapper.killedLeaderBefore = false; + tryExecute(env, "One-to-one exactly once test"); + + // start a new broker: + kafkaServer.restartBroker(leaderId); + } + + public void runKeyValueTest() throws Exception { + final String topic = "keyvaluetest"; + createTestTopic(topic, 1, 1); + final int ELEMENT_COUNT = 5000; + + // ----------- Write some data into Kafka ------------------- + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setParallelism(1); + env.setNumberOfExecutionRetries(0); + env.getConfig().disableSysoutLogging(); + + DataStream<Tuple2<Long, PojoValue>> kvStream = env.addSource(new SourceFunction<Tuple2<Long, PojoValue>>() { + @Override + public void run(SourceContext<Tuple2<Long, PojoValue>> ctx) throws Exception { + Random rnd = new Random(1337); + for (long i = 0; i < ELEMENT_COUNT; i++) { + PojoValue pojo = new PojoValue(); + pojo.when = new Date(rnd.nextLong()); + pojo.lon = rnd.nextLong(); + pojo.lat = i; + // make every second key null to ensure proper "null" serialization + Long key = (i % 2 == 0) ? null : i; + ctx.collect(new Tuple2<>(key, pojo)); + } + } + @Override + public void cancel() { + } + }); + + KeyedSerializationSchema<Tuple2<Long, PojoValue>> schema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig()); + kvStream.addSink(kafkaServer.getProducer(topic, schema, FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings), null)); + env.execute("Write KV to Kafka"); + + // ----------- Read the data again ------------------- + + env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setParallelism(1); + env.setNumberOfExecutionRetries(0); + env.getConfig().disableSysoutLogging(); + + + KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig()); + + DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, readSchema, standardProps)); + fromKafka.flatMap(new RichFlatMapFunction<Tuple2<Long,PojoValue>, Object>() { + long counter = 0; + @Override + public void flatMap(Tuple2<Long, PojoValue> value, Collector<Object> out) throws Exception { + // the elements should be in order. + Assert.assertTrue("Wrong value " + value.f1.lat, value.f1.lat == counter ); + if (value.f1.lat % 2 == 0) { + assertNull("key was not null", value.f0); + } else { + Assert.assertTrue("Wrong value " + value.f0, value.f0 == counter); + } + counter++; + if (counter == ELEMENT_COUNT) { + // we got the right number of elements + throw new SuccessException(); + } + } + }); + + tryExecute(env, "Read KV from Kafka"); + + deleteTestTopic(topic); + } + + public static class PojoValue { + public Date when; + public long lon; + public long lat; + public PojoValue() {} + } + + + /** + * Test delete behavior and metrics for producer + * @throws Exception + */ + public void runAllDeletesTest() throws Exception { + final String topic = "alldeletestest"; + createTestTopic(topic, 1, 1); + final int ELEMENT_COUNT = 300; + + // ----------- Write some data into Kafka ------------------- + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setParallelism(1); + env.setNumberOfExecutionRetries(0); + env.getConfig().disableSysoutLogging(); + + DataStream<Tuple2<byte[], PojoValue>> kvStream = env.addSource(new SourceFunction<Tuple2<byte[], PojoValue>>() { + @Override + public void run(SourceContext<Tuple2<byte[], PojoValue>> ctx) throws Exception { + Random rnd = new Random(1337); + for (long i = 0; i < ELEMENT_COUNT; i++) { + final byte[] key = new byte[200]; + rnd.nextBytes(key); + ctx.collect(new Tuple2<>(key, (PojoValue) null)); + } + } + @Override + public void cancel() { + } + }); + + TypeInformationKeyValueSerializationSchema<byte[], PojoValue> schema = new TypeInformationKeyValueSerializationSchema<>(byte[].class, PojoValue.class, env.getConfig()); + + kvStream.addSink(kafkaServer.getProducer(topic, schema, FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings), null)); + + JobExecutionResult result = env.execute("Write deletes to Kafka"); + + Map<String, Object> accuResults = result.getAllAccumulatorResults(); + // there are 37 accumulator results in Kafka 0.9 + // and 34 in Kafka 0.8 + Assert.assertTrue("Not enough accumulators from Kafka Producer: " + accuResults.size(), accuResults.size() > 33); + + // ----------- Read the data again ------------------- + + env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setParallelism(1); + env.setNumberOfExecutionRetries(0); + env.getConfig().disableSysoutLogging(); + + DataStream<Tuple2<byte[], PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); + + fromKafka.flatMap(new RichFlatMapFunction<Tuple2<byte[], PojoValue>, Object>() { + long counter = 0; + @Override + public void flatMap(Tuple2<byte[], PojoValue> value, Collector<Object> out) throws Exception { + // ensure that deleted messages are passed as nulls + assertNull(value.f1); + counter++; + if (counter == ELEMENT_COUNT) { + // we got the right number of elements + throw new SuccessException(); + } + } + }); + + tryExecute(env, "Read deletes from Kafka"); + + deleteTestTopic(topic); + } + + /** + * Test that ensures that DeserializationSchema.isEndOfStream() is properly evaluated + * and that the metrics for the consumer are properly reported. + * + * @throws Exception + */ + public void runMetricsAndEndOfStreamTest() throws Exception { + final String topic = "testEndOfStream"; + createTestTopic(topic, 1, 1); + final int ELEMENT_COUNT = 300; + + // write some data + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setParallelism(1); + env.setNumberOfExecutionRetries(0); + env.getConfig().disableSysoutLogging(); + + writeSequence(env, topic, ELEMENT_COUNT, 1); + + // read using custom schema + final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env1.setParallelism(1); + env1.setNumberOfExecutionRetries(0); + env1.getConfig().disableSysoutLogging(); + + DataStream<Tuple2<Integer, Integer>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(ELEMENT_COUNT), standardProps)); + fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Void>() { + @Override + public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception { + // noop ;) + } + }); + + JobExecutionResult result = tryExecute(env, "Consume " + ELEMENT_COUNT + " elements from Kafka"); + + Map<String, Object> accuResults = result.getAllAccumulatorResults(); + // kafka 0.9 consumer: 39 results + if(kafkaServer.getVersion().equals("0.9")) { + Assert.assertTrue("Not enough accumulators from Kafka Consumer: " + accuResults.size(), accuResults.size() > 38); + } + + deleteTestTopic(topic); + } + + public static class FixedNumberDeserializationSchema implements DeserializationSchema<Tuple2<Integer, Integer>> { + final int finalCount; + int count = 0; + TypeInformation<Tuple2<Integer, Integer>> ti = TypeInfoParser.parse("Tuple2<Integer, Integer>"); + TypeSerializer<Tuple2<Integer, Integer>> ser = ti.createSerializer(new ExecutionConfig()); + + public FixedNumberDeserializationSchema(int finalCount) { + this.finalCount = finalCount; + } + + @Override + public Tuple2<Integer, Integer> deserialize(byte[] message) throws IOException { + return ser.deserialize(new ByteArrayInputView(message)); + } + + @Override + public boolean isEndOfStream(Tuple2<Integer, Integer> nextElement) { + return ++count >= finalCount; + } + + @Override + public TypeInformation<Tuple2<Integer, Integer>> getProducedType() { + return ti; + } + } + + + // ------------------------------------------------------------------------ + // Reading writing test data sets + // ------------------------------------------------------------------------ + + protected void readSequence(StreamExecutionEnvironment env, Properties cc, + final int sourceParallelism, + final String topicName, + final int valuesCount, final int startFrom) throws Exception { + env.getCheckpointConfig().setCheckpointTimeout(5000); // set timeout for checkpoints to 5 seconds + + final int finalCount = valuesCount * sourceParallelism; + + final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType = TypeInfoParser.parse("Tuple2<Integer, Integer>"); + + final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser = + new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig()); + + // create the consumer + FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deser, cc); + + DataStream<Tuple2<Integer, Integer>> source = env + .addSource(consumer).setParallelism(sourceParallelism) + .map(new ThrottledMapper<Tuple2<Integer, Integer>>(20)).setParallelism(sourceParallelism); + + // verify data + source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() { + + private int[] values = new int[valuesCount]; + private int count = 0; + + @Override + public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception { + values[value.f1 - startFrom]++; + count++; + LOG.info("Received message {}, total {} messages", value, count); + + // verify if we've seen everything + if (count == finalCount) { + for (int i = 0; i < values.length; i++) { + int v = values[i]; + if (v != sourceParallelism) { + printTopic(topicName, valuesCount, deser); + throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values)); + } + } + // test has passed + throw new SuccessException(); + } + } + + }).setParallelism(1); + + tryExecute(env, "Read data from Kafka"); + + LOG.info("Successfully read sequence for verification"); + } + + protected void writeSequence(StreamExecutionEnvironment env, String topicName, final int numElements, int parallelism) throws Exception { + + LOG.info("\n===================================\n== Writing sequence of "+numElements+" into "+topicName+" with p="+parallelism+"\n==================================="); + TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>"); + + DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() { + + private boolean running = true; + + @Override + public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { + int cnt = 0; + int partition = getRuntimeContext().getIndexOfThisSubtask(); + + while (running && cnt < numElements) { + ctx.collect(new Tuple2<>(partition, cnt)); + cnt++; + } + } + + @Override + public void cancel() { + running = false; + } + }).setParallelism(parallelism); + + stream.addSink(kafkaServer.getProducer(topicName, + new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(resultType, env.getConfig())), + FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings), + new Tuple2Partitioner(parallelism))).setParallelism(parallelism); + + env.execute("Write sequence"); + + LOG.info("Finished writing sequence"); + } + + // ------------------------------------------------------------------------ + // Debugging utilities + // ------------------------------------------------------------------------ + + /** + * Read topic to list, only using Kafka code. + */ + private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) { + ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config); + // we request only one stream per consumer instance. Kafka will make sure that each consumer group + // will see each message only once. + Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1); + Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap); + if (streams.size() != 1) { + throw new RuntimeException("Expected only one message stream but got "+streams.size()); + } + List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName); + if (kafkaStreams == null) { + throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString()); + } + if (kafkaStreams.size() != 1) { + throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams"); + } + LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId()); + ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator(); + + List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>(); + int read = 0; + while(iteratorToRead.hasNext()) { + read++; + result.add(iteratorToRead.next()); + if (read == stopAfter) { + LOG.info("Read "+read+" elements"); + return result; + } + } + return result; + } + + private static void printTopic(String topicName, ConsumerConfig config, + DeserializationSchema<?> deserializationSchema, + int stopAfter) throws IOException { + + List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter); + LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId()); + + for (MessageAndMetadata<byte[], byte[]> message: contents) { + Object out = deserializationSchema.deserialize(message.message()); + LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString()); + } + } + + private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer) + throws IOException + { + // write the sequence to log for debugging purposes + Properties stdProps = standardCC.props().props(); + Properties newProps = new Properties(stdProps); + newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString()); + newProps.setProperty("auto.offset.reset", "smallest"); + newProps.setProperty("zookeeper.connect", standardCC.zkConnect()); + + ConsumerConfig printerConfig = new ConsumerConfig(newProps); + printTopic(topicName, printerConfig, deserializer, elements); + } + + + public static class BrokerKillingMapper<T> extends RichMapFunction<T,T> + implements Checkpointed<Integer>, CheckpointNotifier { + + private static final long serialVersionUID = 6334389850158707313L; + + public static volatile boolean killedLeaderBefore; + public static volatile boolean hasBeenCheckpointedBeforeFailure; + + private final int shutdownBrokerId; + private final int failCount; + private int numElementsTotal; + + private boolean failer; + private boolean hasBeenCheckpointed; + + + public BrokerKillingMapper(int shutdownBrokerId, int failCount) { + this.shutdownBrokerId = shutdownBrokerId; + this.failCount = failCount; + } + + @Override + public void open(Configuration parameters) { + failer = getRuntimeContext().getIndexOfThisSubtask() == 0; + } + + @Override + public T map(T value) throws Exception { + numElementsTotal++; + + if (!killedLeaderBefore) { + Thread.sleep(10); + + if (failer && numElementsTotal >= failCount) { + // shut down a Kafka broker + KafkaServer toShutDown = null; + for (KafkaServer server : kafkaServer.getBrokers()) { + + if (kafkaServer.getBrokerId(server) == shutdownBrokerId) { + toShutDown = server; + break; + } + } + + if (toShutDown == null) { + StringBuilder listOfBrokers = new StringBuilder(); + for (KafkaServer server : kafkaServer.getBrokers()) { + listOfBrokers.append(kafkaServer.getBrokerId(server)); + listOfBrokers.append(" ; "); + } + + throw new Exception("Cannot find broker to shut down: " + shutdownBrokerId + + " ; available brokers: " + listOfBrokers.toString()); + } + else { + hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed; + killedLeaderBefore = true; + toShutDown.shutdown(); + } + } + } + return value; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + hasBeenCheckpointed = true; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) { + return numElementsTotal; + } + + @Override + public void restoreState(Integer state) { + this.numElementsTotal = state; + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java new file mode 100644 index 0000000..228f3ac --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; +import org.apache.flink.test.util.SuccessException; + + +import java.io.Serializable; + +import static org.apache.flink.test.util.TestUtils.tryExecute; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@SuppressWarnings("serial") +public abstract class KafkaProducerTestBase extends KafkaTestBase { + + + /** + * + * <pre> + * +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+ + * / | \ + * / | \ + * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink) + * \ | / + * \ | / + * +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+ + * </pre> + * + * The mapper validates that the values come consistently from the correct Kafka partition. + * + * The final sink validates that there are no duplicates and that all partitions are present. + */ + public void runCustomPartitioningTest() { + try { + LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()"); + + final String topic = "customPartitioningTestTopic"; + final int parallelism = 3; + + createTestTopic(topic, parallelism, 1); + + TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setNumberOfExecutionRetries(0); + env.getConfig().disableSysoutLogging(); + + TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema = + new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig()); + + TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema = + new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig()); + + // ------ producing topology --------- + + // source has DOP 1 to make sure it generates no duplicates + DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() { + + private boolean running = true; + + @Override + public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception { + long cnt = 0; + while (running) { + ctx.collect(new Tuple2<Long, String>(cnt, "kafka-" + cnt)); + cnt++; + } + } + + @Override + public void cancel() { + running = false; + } + }) + .setParallelism(1); + + // sink partitions into + stream.addSink(kafkaServer.getProducer(topic, + new KeyedSerializationSchemaWrapper<>(serSchema), + FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings), + new CustomPartitioner(parallelism))) + .setParallelism(parallelism); + + // ------ consuming topology --------- + + FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, standardProps); + + env.addSource(source).setParallelism(parallelism) + + // mapper that validates partitioning and maps to partition + .map(new RichMapFunction<Tuple2<Long, String>, Integer>() { + + private int ourPartition = -1; + @Override + public Integer map(Tuple2<Long, String> value) { + int partition = value.f0.intValue() % parallelism; + if (ourPartition != -1) { + assertEquals("inconsistent partitioning", ourPartition, partition); + } else { + ourPartition = partition; + } + return partition; + } + }).setParallelism(parallelism) + + .addSink(new SinkFunction<Integer>() { + + private int[] valuesPerPartition = new int[parallelism]; + + @Override + public void invoke(Integer value) throws Exception { + valuesPerPartition[value]++; + + boolean missing = false; + for (int i : valuesPerPartition) { + if (i < 100) { + missing = true; + break; + } + } + if (!missing) { + throw new SuccessException(); + } + } + }).setParallelism(1); + + tryExecute(env, "custom partitioning test"); + + deleteTestTopic(topic); + + LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()"); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // ------------------------------------------------------------------------ + + public static class CustomPartitioner extends KafkaPartitioner implements Serializable { + + private final int expectedPartitions; + + public CustomPartitioner(int expectedPartitions) { + this.expectedPartitions = expectedPartitions; + } + + + @Override + public int partition(Object next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { + Tuple2<Long, String> tuple = (Tuple2<Long, String>) next; + + assertEquals(expectedPartitions, numPartitions); + + return (int) (tuple.f0 % numPartitions); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java new file mode 100644 index 0000000..73cd2f9 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import kafka.consumer.ConsumerConfig; + + +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.test.util.SuccessException; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.TestLogger; + + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + + +/** + * The base for the Kafka tests. It brings up: + * <ul> + * <li>A ZooKeeper mini cluster</li> + * <li>Three Kafka Brokers (mini clusters)</li> + * <li>A Flink mini cluster</li> + * </ul> + * + * <p>Code in this test is based on the following GitHub repository: + * <a href="https://github.com/sakserv/hadoop-mini-clusters"> + * https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed), + * as per commit <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i></p> + */ +@SuppressWarnings("serial") +public abstract class KafkaTestBase extends TestLogger { + + protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class); + + protected static final int NUMBER_OF_KAFKA_SERVERS = 3; + + protected static String brokerConnectionStrings; + + protected static ConsumerConfig standardCC; + protected static Properties standardProps; + + protected static ForkableFlinkMiniCluster flink; + + protected static int flinkPort; + + protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); + + protected static KafkaTestEnvironment kafkaServer; + + // ------------------------------------------------------------------------ + // Setup and teardown of the mini clusters + // ------------------------------------------------------------------------ + + @BeforeClass + public static void prepare() throws IOException, ClassNotFoundException { + LOG.info("-------------------------------------------------------------------------"); + LOG.info(" Starting KafkaITCase "); + LOG.info("-------------------------------------------------------------------------"); + + + + // dynamically load the implementation for the test + Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"); + kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz); + + LOG.info("Starting KafkaITCase.prepare() for Kafka " + kafkaServer.getVersion()); + + kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS); + + standardProps = kafkaServer.getStandardProperties(); + standardCC = kafkaServer.getStandardConsumerConfig(); + brokerConnectionStrings = kafkaServer.getBrokerConnectionString(); + + // start also a re-usable Flink mini cluster + Configuration flinkConfig = new Configuration(); + flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + flinkConfig.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "0 s"); + + flink = new ForkableFlinkMiniCluster(flinkConfig, false); + flink.start(); + + flinkPort = flink.getLeaderRPCPort(); + } + + @AfterClass + public static void shutDownServices() { + + LOG.info("-------------------------------------------------------------------------"); + LOG.info(" Shut down KafkaITCase "); + LOG.info("-------------------------------------------------------------------------"); + + flinkPort = -1; + if (flink != null) { + flink.shutdown(); + } + + kafkaServer.shutdown(); + + LOG.info("-------------------------------------------------------------------------"); + LOG.info(" KafkaITCase finished"); + LOG.info("-------------------------------------------------------------------------"); + } + + + + // ------------------------------------------------------------------------ + // Execution utilities + // ------------------------------------------------------------------------ + + + protected static void tryExecutePropagateExceptions(StreamExecutionEnvironment see, String name) throws Exception { + try { + see.execute(name); + } + catch (ProgramInvocationException | JobExecutionException root) { + Throwable cause = root.getCause(); + + // search for nested SuccessExceptions + int depth = 0; + while (!(cause instanceof SuccessException)) { + if (cause == null || depth++ == 20) { + throw root; + } + else { + cause = cause.getCause(); + } + } + } + } + + protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) { + kafkaServer.createTestTopic(topic, numberOfPartitions, replicationFactor); + } + + protected static void deleteTestTopic(String topic) { + kafkaServer.deleteTestTopic(topic); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java new file mode 100644 index 0000000..40be8a1 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import kafka.consumer.ConsumerConfig; +import kafka.server.KafkaServer; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; + +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +/** + * Abstract class providing a Kafka test environment + */ +public abstract class KafkaTestEnvironment { + + protected static final String KAFKA_HOST = "localhost"; + + public abstract void prepare(int numKafkaServers); + + public abstract void shutdown(); + + public abstract void deleteTestTopic(String topic); + + public abstract void createTestTopic(String topic, int numberOfPartitions, int replicationFactor); + + public abstract ConsumerConfig getStandardConsumerConfig(); + + public abstract Properties getStandardProperties(); + + public abstract String getBrokerConnectionString(); + + public abstract String getVersion(); + + public abstract List<KafkaServer> getBrokers(); + + // -- consumer / producer instances: + public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, DeserializationSchema<T> deserializationSchema, Properties props) { + return getConsumer(topics, new KeyedDeserializationSchemaWrapper<T>(deserializationSchema), props); + } + + public <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, KeyedDeserializationSchema<T> readSchema, Properties props) { + return getConsumer(Collections.singletonList(topic), readSchema, props); + } + + public <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) { + return getConsumer(Collections.singletonList(topic), deserializationSchema, props); + } + + public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props); + + public abstract <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner); + + + // -- leader failure simulation + + public abstract void restartBroker(int leaderId) throws Exception; + + public abstract int getLeaderToShutDown(String topic) throws Exception; + + public abstract int getBrokerId(KafkaServer server); +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java new file mode 100644 index 0000000..5dab05a --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; + +public class TestFixedPartitioner { + + + /** + * <pre> + * Flink Sinks: Kafka Partitions + * 1 ----------------> 1 + * 2 --------------/ + * 3 -------------/ + * 4 ------------/ + * </pre> + */ + @Test + public void testMoreFlinkThanBrokers() { + FixedPartitioner<String> part = new FixedPartitioner<>(); + + int[] partitions = new int[]{0}; + + part.open(0, 4, partitions); + Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); + + part.open(1, 4, partitions); + Assert.assertEquals(0, part.partition("abc2", null, null, partitions.length)); + + part.open(2, 4, partitions); + Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length)); + Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length)); // check if it is changing ;) + + part.open(3, 4, partitions); + Assert.assertEquals(0, part.partition("abc4", null, null, partitions.length)); + } + + /** + * + * <pre> + * Flink Sinks: Kafka Partitions + * 1 ----------------> 1 + * 2 ----------------> 2 + * 3 + * 4 + * 5 + * + * </pre> + */ + @Test + public void testFewerPartitions() { + FixedPartitioner<String> part = new FixedPartitioner<>(); + + int[] partitions = new int[]{0, 1, 2, 3, 4}; + part.open(0, 2, partitions); + Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); + Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); + + part.open(1, 2, partitions); + Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length)); + Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length)); + } + + /* + * Flink Sinks: Kafka Partitions + * 1 ------------>---> 1 + * 2 -----------/----> 2 + * 3 ----------/ + */ + @Test + public void testMixedCase() { + FixedPartitioner<String> part = new FixedPartitioner<>(); + int[] partitions = new int[]{0,1}; + + part.open(0, 3, partitions); + Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); + + part.open(1, 3, partitions); + Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length)); + + part.open(2, 3, partitions); + Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java new file mode 100644 index 0000000..cc237b6 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase; +import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment; +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; + +import java.util.Random; + +@SuppressWarnings("serial") +public class DataGenerators { + + public static void generateLongStringTupleSequence(StreamExecutionEnvironment env, + KafkaTestEnvironment testServer, String topic, + int numPartitions, + final int from, final int to) throws Exception { + + TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>"); + + env.setParallelism(numPartitions); + env.getConfig().disableSysoutLogging(); + env.setNumberOfExecutionRetries(0); + + DataStream<Tuple2<Integer, Integer>> stream =env.addSource( + new RichParallelSourceFunction<Tuple2<Integer, Integer>>() { + + private volatile boolean running = true; + + @Override + public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { + int cnt = from; + int partition = getRuntimeContext().getIndexOfThisSubtask(); + + while (running && cnt <= to) { + ctx.collect(new Tuple2<Integer, Integer>(partition, cnt)); + cnt++; + } + } + + @Override + public void cancel() { + running = false; + } + }); + + stream.addSink(testServer.getProducer(topic, + new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(resultType, env.getConfig())), + FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()), + new Tuple2Partitioner(numPartitions) + )); + + env.execute("Data generator (Int, Int) stream to topic " + topic); + } + + // ------------------------------------------------------------------------ + + public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env, + KafkaTestEnvironment testServer, String topic, + final int numPartitions, + final int numElements, + final boolean randomizeOrder) throws Exception { + env.setParallelism(numPartitions); + env.getConfig().disableSysoutLogging(); + env.setNumberOfExecutionRetries(0); + + DataStream<Integer> stream = env.addSource( + new RichParallelSourceFunction<Integer>() { + + private volatile boolean running = true; + + @Override + public void run(SourceContext<Integer> ctx) { + // create a sequence + int[] elements = new int[numElements]; + for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask(); + i < numElements; + i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) { + + elements[i] = val; + } + + // scramble the sequence + if (randomizeOrder) { + Random rnd = new Random(); + for (int i = 0; i < elements.length; i++) { + int otherPos = rnd.nextInt(elements.length); + + int tmp = elements[i]; + elements[i] = elements[otherPos]; + elements[otherPos] = tmp; + } + } + + // emit the sequence + int pos = 0; + while (running && pos < elements.length) { + ctx.collect(elements[pos++]); + } + } + + @Override + public void cancel() { + running = false; + } + }); + + stream + .rebalance() + .addSink(testServer.getProducer(topic, + new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())), + FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()), + new KafkaPartitioner<Integer>() { + @Override + public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { + return next % numPartitions; + } + })); + + env.execute("Scrambles int sequence generator"); + } + + // ------------------------------------------------------------------------ + + public static class InfiniteStringsGenerator extends Thread { + + private final KafkaTestEnvironment server; + + private final String topic; + + private volatile Throwable error; + + private volatile boolean running = true; + + + public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic) { + this.server = server; + this.topic = topic; + } + + @Override + public void run() { + // we manually feed data into the Kafka sink + FlinkKafkaProducerBase<String> producer = null; + try { + producer = server.getProducer(topic, + new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), + FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString()), new FixedPartitioner<String>()); + producer.setRuntimeContext(new MockRuntimeContext(1,0)); + producer.open(new Configuration()); + + final StringBuilder bld = new StringBuilder(); + final Random rnd = new Random(); + + while (running) { + bld.setLength(0); + + int len = rnd.nextInt(100) + 1; + for (int i = 0; i < len; i++) { + bld.append((char) (rnd.nextInt(20) + 'a') ); + } + + String next = bld.toString(); + producer.invoke(next); + } + } + catch (Throwable t) { + this.error = t; + } + finally { + if (producer != null) { + try { + producer.close(); + } + catch (Throwable t) { + // ignore + } + } + } + } + + public void shutdown() { + this.running = false; + this.interrupt(); + } + + public Throwable getError() { + return this.error; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java new file mode 100644 index 0000000..987e6c5 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.streaming.api.functions.sink.SinkFunction; + +/** + * Sink function that discards data. + * @param <T> The type of the function. + */ +public class DiscardingSink<T> implements SinkFunction<T> { + + private static final long serialVersionUID = 2777597566520109843L; + + @Override + public void invoke(T value) {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java new file mode 100644 index 0000000..5a8ffaa --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements + Checkpointed<Integer>, CheckpointNotifier, Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class); + + private static final long serialVersionUID = 6334389850158707313L; + + public static volatile boolean failedBefore; + public static volatile boolean hasBeenCheckpointedBeforeFailure; + + private final int failCount; + private int numElementsTotal; + private int numElementsThisTime; + + private boolean failer; + private boolean hasBeenCheckpointed; + + private Thread printer; + private volatile boolean printerRunning = true; + + public FailingIdentityMapper(int failCount) { + this.failCount = failCount; + } + + @Override + public void open(Configuration parameters) { + failer = getRuntimeContext().getIndexOfThisSubtask() == 0; + printer = new Thread(this, "FailingIdentityMapper Status Printer"); + printer.start(); + } + + @Override + public T map(T value) throws Exception { + numElementsTotal++; + numElementsThisTime++; + + if (!failedBefore) { + Thread.sleep(10); + + if (failer && numElementsTotal >= failCount) { + hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed; + failedBefore = true; + throw new Exception("Artificial Test Failure"); + } + } + return value; + } + + @Override + public void close() throws Exception { + printerRunning = false; + if (printer != null) { + printer.interrupt(); + printer = null; + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + this.hasBeenCheckpointed = true; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) { + return numElementsTotal; + } + + @Override + public void restoreState(Integer state) { + numElementsTotal = state; + } + + @Override + public void run() { + while (printerRunning) { + try { + Thread.sleep(5000); + } + catch (InterruptedException e) { + // ignore + } + LOG.info("============================> Failing mapper {}: count={}, totalCount={}", + getRuntimeContext().getIndexOfThisSubtask(), + numElementsThisTime, numElementsTotal); + } + } +}
