http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java deleted file mode 100644 index aa7ea49..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ /dev/null @@ -1,2006 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; -import kafka.server.KafkaServer; -import org.apache.commons.io.output.ByteArrayOutputStream; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.client.program.ProgramInvocationException; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.client.JobCancellationException; -import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; -import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators; -import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper; -import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils; -import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper; -import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper; -import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner; -import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema; -import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; -import org.apache.flink.test.util.SuccessException; -import org.apache.flink.testutils.junit.RetryOnException; -import org.apache.flink.testutils.junit.RetryRule; -import org.apache.flink.util.Collector; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.errors.TimeoutException; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; - -import javax.management.MBeanServer; -import javax.management.ObjectName; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Random; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; - -import static org.apache.flink.test.util.TestUtils.tryExecute; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - - -@SuppressWarnings("serial") -public abstract class KafkaConsumerTestBase extends KafkaTestBase { - - @Rule - public RetryRule retryRule = new RetryRule(); - - - // ------------------------------------------------------------------------ - // Common Test Preparation - // ------------------------------------------------------------------------ - - /** - * Makes sure that no job is on the JobManager any more from any previous tests that use - * the same mini cluster. Otherwise, missing slots may happen. - */ - @Before - public void ensureNoJobIsLingering() throws Exception { - JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); - } - - - // ------------------------------------------------------------------------ - // Suite of Tests - // - // The tests here are all not activated (by an @Test tag), but need - // to be invoked from the extending classes. That way, the classes can - // select which tests to run. - // ------------------------------------------------------------------------ - - /** - * Test that ensures the KafkaConsumer is properly failing if the topic doesnt exist - * and a wrong broker was specified - * - * @throws Exception - */ - public void runFailOnNoBrokerTest() throws Exception { - try { - Properties properties = new Properties(); - - StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - see.getConfig().disableSysoutLogging(); - see.setRestartStrategy(RestartStrategies.noRestart()); - see.setParallelism(1); - - // use wrong ports for the consumers - properties.setProperty("bootstrap.servers", "localhost:80"); - properties.setProperty("zookeeper.connect", "localhost:80"); - properties.setProperty("group.id", "test"); - properties.setProperty("request.timeout.ms", "3000"); // let the test fail fast - properties.setProperty("socket.timeout.ms", "3000"); - properties.setProperty("session.timeout.ms", "2000"); - properties.setProperty("fetch.max.wait.ms", "2000"); - properties.setProperty("heartbeat.interval.ms", "1000"); - properties.putAll(secureProps); - FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer("doesntexist", new SimpleStringSchema(), properties); - DataStream<String> stream = see.addSource(source); - stream.print(); - see.execute("No broker test"); - } catch(ProgramInvocationException pie) { - if(kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10")) { - assertTrue(pie.getCause() instanceof JobExecutionException); - - JobExecutionException jee = (JobExecutionException) pie.getCause(); - - assertTrue(jee.getCause() instanceof TimeoutException); - - TimeoutException te = (TimeoutException) jee.getCause(); - - assertEquals("Timeout expired while fetching topic metadata", te.getMessage()); - } else { - assertTrue(pie.getCause() instanceof JobExecutionException); - - JobExecutionException jee = (JobExecutionException) pie.getCause(); - - assertTrue(jee.getCause() instanceof RuntimeException); - - RuntimeException re = (RuntimeException) jee.getCause(); - - assertTrue(re.getMessage().contains("Unable to retrieve any partitions for the requested topics [doesntexist]")); - } - } - } - - /** - * Ensures that the committed offsets to Kafka are the offsets of "the next record to process" - */ - public void runCommitOffsetsToKafka() throws Exception { - // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) - final int parallelism = 3; - final int recordsInEachPartition = 50; - - final String topicName = writeSequence("testCommitOffsetsToKafkaTopic", recordsInEachPartition, parallelism, 1); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.getConfig().disableSysoutLogging(); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env.setParallelism(parallelism); - env.enableCheckpointing(200); - - DataStream<String> stream = env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps)); - stream.addSink(new DiscardingSink<String>()); - - final AtomicReference<Throwable> errorRef = new AtomicReference<>(); - final Thread runner = new Thread("runner") { - @Override - public void run() { - try { - env.execute(); - } - catch (Throwable t) { - if (!(t.getCause() instanceof JobCancellationException)) { - errorRef.set(t); - } - } - } - }; - runner.start(); - - final Long l50 = 50L; // the final committed offset in Kafka should be 50 - final long deadline = 30000 + System.currentTimeMillis(); - - KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); - - do { - Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); - Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); - Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); - - if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) { - break; - } - - Thread.sleep(100); - } - while (System.currentTimeMillis() < deadline); - - // cancel the job - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); - - final Throwable t = errorRef.get(); - if (t != null) { - throw new RuntimeException("Job failed with an exception", t); - } - - // final check to see if offsets are correctly in Kafka - Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); - Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); - Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); - Assert.assertEquals(Long.valueOf(50L), o1); - Assert.assertEquals(Long.valueOf(50L), o2); - Assert.assertEquals(Long.valueOf(50L), o3); - - kafkaOffsetHandler.close(); - deleteTestTopic(topicName); - } - - /** - * This test first writes a total of 300 records to a test topic, reads the first 150 so that some offsets are - * committed to Kafka, and then startup the consumer again to read the remaining records starting from the committed offsets. - * The test ensures that whatever offsets were committed to Kafka, the consumer correctly picks them up - * and starts at the correct position. - */ - public void runStartFromKafkaCommitOffsets() throws Exception { - final int parallelism = 3; - final int recordsInEachPartition = 300; - - final String topicName = writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, parallelism, 1); - - KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); - - Long o1; - Long o2; - Long o3; - int attempt = 0; - // make sure that o1, o2, o3 are not all null before proceeding - do { - attempt++; - LOG.info("Attempt " + attempt + " to read records and commit some offsets to Kafka"); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.getConfig().disableSysoutLogging(); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env.setParallelism(parallelism); - env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets - - env - .addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps)) - .map(new ThrottledMapper<String>(50)) - .map(new MapFunction<String, Object>() { - int count = 0; - @Override - public Object map(String value) throws Exception { - count++; - if (count == 150) { - throw new SuccessException(); - } - return null; - } - }) - .addSink(new DiscardingSink<>()); - - tryExecute(env, "Read some records to commit offsets to Kafka"); - - o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); - o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); - o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); - } while (o1 == null && o2 == null && o3 == null && attempt < 3); - - if (o1 == null && o2 == null && o3 == null) { - throw new RuntimeException("No offsets have been committed after 3 attempts"); - } - - LOG.info("Got final committed offsets from Kafka o1={}, o2={}, o3={}", o1, o2, o3); - - final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env2.getConfig().disableSysoutLogging(); - env2.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env2.setParallelism(parallelism); - - // whatever offsets were committed for each partition, the consumer should pick - // them up and start from the correct position so that the remaining records are all read - HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset = new HashMap<>(); - partitionsToValuesCountAndStartOffset.put(0, new Tuple2<>( - (o1 != null) ? (int) (recordsInEachPartition - o1) : recordsInEachPartition, - (o1 != null) ? o1.intValue() : 0 - )); - partitionsToValuesCountAndStartOffset.put(1, new Tuple2<>( - (o2 != null) ? (int) (recordsInEachPartition - o2) : recordsInEachPartition, - (o2 != null) ? o2.intValue() : 0 - )); - partitionsToValuesCountAndStartOffset.put(2, new Tuple2<>( - (o3 != null) ? (int) (recordsInEachPartition - o3) : recordsInEachPartition, - (o3 != null) ? o3.intValue() : 0 - )); - - readSequence(env2, standardProps, topicName, partitionsToValuesCountAndStartOffset); - - kafkaOffsetHandler.close(); - deleteTestTopic(topicName); - } - - /** - * This test ensures that when the consumers retrieve some start offset from kafka (earliest, latest), that this offset - * is committed to Kafka, even if some partitions are not read. - * - * Test: - * - Create 3 partitions - * - write 50 messages into each. - * - Start three consumers with auto.offset.reset='latest' and wait until they committed into Kafka. - * - Check if the offsets in Kafka are set to 50 for the three partitions - * - * See FLINK-3440 as well - */ - public void runAutoOffsetRetrievalAndCommitToKafka() throws Exception { - // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) - final int parallelism = 3; - final int recordsInEachPartition = 50; - - final String topicName = writeSequence("testAutoOffsetRetrievalAndCommitToKafkaTopic", recordsInEachPartition, parallelism, 1); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.getConfig().disableSysoutLogging(); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env.setParallelism(parallelism); - env.enableCheckpointing(200); - - Properties readProps = new Properties(); - readProps.putAll(standardProps); - readProps.setProperty("auto.offset.reset", "latest"); // set to reset to latest, so that partitions are initially not read - - DataStream<String> stream = env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), readProps)); - stream.addSink(new DiscardingSink<String>()); - - final AtomicReference<Throwable> errorRef = new AtomicReference<>(); - final Thread runner = new Thread("runner") { - @Override - public void run() { - try { - env.execute(); - } - catch (Throwable t) { - if (!(t.getCause() instanceof JobCancellationException)) { - errorRef.set(t); - } - } - } - }; - runner.start(); - - KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); - - final Long l50 = 50L; // the final committed offset in Kafka should be 50 - final long deadline = 30000 + System.currentTimeMillis(); - do { - Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); - Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); - Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); - - if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) { - break; - } - - Thread.sleep(100); - } - while (System.currentTimeMillis() < deadline); - - // cancel the job - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); - - final Throwable t = errorRef.get(); - if (t != null) { - throw new RuntimeException("Job failed with an exception", t); - } - - // final check to see if offsets are correctly in Kafka - Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); - Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); - Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); - Assert.assertEquals(Long.valueOf(50L), o1); - Assert.assertEquals(Long.valueOf(50L), o2); - Assert.assertEquals(Long.valueOf(50L), o3); - - kafkaOffsetHandler.close(); - deleteTestTopic(topicName); - } - - /** - * Ensure Kafka is working on both producer and consumer side. - * This executes a job that contains two Flink pipelines. - * - * <pre> - * (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink) - * </pre> - * - * We need to externally retry this test. We cannot let Flink's retry mechanism do it, because the Kafka producer - * does not guarantee exactly-once output. Hence a recovery would introduce duplicates that - * cause the test to fail. - * - * This test also ensures that FLINK-3156 doesn't happen again: - * - * The following situation caused a NPE in the FlinkKafkaConsumer - * - * topic-1 <-- elements are only produced into topic1. - * topic-2 - * - * Therefore, this test is consuming as well from an empty topic. - * - */ - @RetryOnException(times=2, exception=kafka.common.NotLeaderForPartitionException.class) - public void runSimpleConcurrentProducerConsumerTopology() throws Exception { - final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString(); - final String additionalEmptyTopic = "additionalEmptyTopic_" + UUID.randomUUID().toString(); - - final int parallelism = 3; - final int elementsPerPartition = 100; - final int totalElements = parallelism * elementsPerPartition; - - createTestTopic(topic, parallelism, 2); - createTestTopic(additionalEmptyTopic, parallelism, 1); // create an empty topic which will remain empty all the time - - final StreamExecutionEnvironment env = - StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setParallelism(parallelism); - env.enableCheckpointing(500); - env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately - env.getConfig().disableSysoutLogging(); - - TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>"); - - TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema = - new TypeInformationSerializationSchema<>(longStringType, env.getConfig()); - - TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema = - new TypeInformationSerializationSchema<>(longStringType, env.getConfig()); - - // ----------- add producer dataflow ---------- - - DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() { - - private boolean running = true; - - @Override - public void run(SourceContext<Tuple2<Long, String>> ctx) throws InterruptedException { - int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition; - int limit = cnt + elementsPerPartition; - - - while (running && cnt < limit) { - ctx.collect(new Tuple2<>(1000L + cnt, "kafka-" + cnt)); - cnt++; - // we delay data generation a bit so that we are sure that some checkpoints are - // triggered (for FLINK-3156) - Thread.sleep(50); - } - } - - @Override - public void cancel() { - running = false; - } - }); - Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); - producerProperties.setProperty("retries", "3"); - producerProperties.putAll(secureProps); - kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null); - - // ----------- add consumer dataflow ---------- - - List<String> topics = new ArrayList<>(); - topics.add(topic); - topics.add(additionalEmptyTopic); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topics, sourceSchema, props); - - DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism); - - consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() { - - private int elCnt = 0; - private BitSet validator = new BitSet(totalElements); - - @Override - public void invoke(Tuple2<Long, String> value) throws Exception { - String[] sp = value.f1.split("-"); - int v = Integer.parseInt(sp[1]); - - assertEquals(value.f0 - 1000, (long) v); - - assertFalse("Received tuple twice", validator.get(v)); - validator.set(v); - elCnt++; - - if (elCnt == totalElements) { - // check if everything in the bitset is set to true - int nc; - if ((nc = validator.nextClearBit(0)) != totalElements) { - fail("The bitset was not set to 1 on all elements. Next clear:" - + nc + " Set: " + validator); - } - throw new SuccessException(); - } - } - - @Override - public void close() throws Exception { - super.close(); - } - }).setParallelism(1); - - try { - tryExecutePropagateExceptions(env, "runSimpleConcurrentProducerConsumerTopology"); - } - catch (ProgramInvocationException | JobExecutionException e) { - // look for NotLeaderForPartitionException - Throwable cause = e.getCause(); - - // search for nested SuccessExceptions - int depth = 0; - while (cause != null && depth++ < 20) { - if (cause instanceof kafka.common.NotLeaderForPartitionException) { - throw (Exception) cause; - } - cause = cause.getCause(); - } - throw e; - } - - deleteTestTopic(topic); - } - - /** - * Tests the proper consumption when having a 1:1 correspondence between kafka partitions and - * Flink sources. - */ - public void runOneToOneExactlyOnceTest() throws Exception { - - final String topic = "oneToOneTopic"; - final int parallelism = 5; - final int numElementsPerPartition = 1000; - final int totalElements = parallelism * numElementsPerPartition; - final int failAfterElements = numElementsPerPartition / 3; - - createTestTopic(topic, parallelism, 1); - - DataGenerators.generateRandomizedIntegerSequence( - StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort), - kafkaServer, - topic, parallelism, numElementsPerPartition, true); - - // run the topology that fails and recovers - - DeserializationSchema<Integer> schema = - new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.enableCheckpointing(500); - env.setParallelism(parallelism); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); - env.getConfig().disableSysoutLogging(); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - - FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props); - - env - .addSource(kafkaSource) - .map(new PartitionValidatingMapper(parallelism, 1)) - .map(new FailingIdentityMapper<Integer>(failAfterElements)) - .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); - - FailingIdentityMapper.failedBefore = false; - tryExecute(env, "One-to-one exactly once test"); - - deleteTestTopic(topic); - } - - /** - * Tests the proper consumption when having fewer Flink sources than Kafka partitions, so - * one Flink source will read multiple Kafka partitions. - */ - public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception { - final String topic = "oneToManyTopic"; - final int numPartitions = 5; - final int numElementsPerPartition = 1000; - final int totalElements = numPartitions * numElementsPerPartition; - final int failAfterElements = numElementsPerPartition / 3; - - final int parallelism = 2; - - createTestTopic(topic, numPartitions, 1); - - DataGenerators.generateRandomizedIntegerSequence( - StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort), - kafkaServer, - topic, numPartitions, numElementsPerPartition, false); - - // run the topology that fails and recovers - - DeserializationSchema<Integer> schema = - new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.enableCheckpointing(500); - env.setParallelism(parallelism); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); - env.getConfig().disableSysoutLogging(); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props); - - env - .addSource(kafkaSource) - .map(new PartitionValidatingMapper(numPartitions, 3)) - .map(new FailingIdentityMapper<Integer>(failAfterElements)) - .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); - - FailingIdentityMapper.failedBefore = false; - tryExecute(env, "One-source-multi-partitions exactly once test"); - - deleteTestTopic(topic); - } - - /** - * Tests the proper consumption when having more Flink sources than Kafka partitions, which means - * that some Flink sources will read no partitions. - */ - public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception { - final String topic = "manyToOneTopic"; - final int numPartitions = 5; - final int numElementsPerPartition = 1000; - final int totalElements = numPartitions * numElementsPerPartition; - final int failAfterElements = numElementsPerPartition / 3; - - final int parallelism = 8; - - createTestTopic(topic, numPartitions, 1); - - DataGenerators.generateRandomizedIntegerSequence( - StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort), - kafkaServer, - topic, numPartitions, numElementsPerPartition, true); - - // run the topology that fails and recovers - - DeserializationSchema<Integer> schema = - new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.enableCheckpointing(500); - env.setParallelism(parallelism); - // set the number of restarts to one. The failing mapper will fail once, then it's only success exceptions. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); - env.getConfig().disableSysoutLogging(); - env.setBufferTimeout(0); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props); - - env - .addSource(kafkaSource) - .map(new PartitionValidatingMapper(numPartitions, 1)) - .map(new FailingIdentityMapper<Integer>(failAfterElements)) - .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); - - FailingIdentityMapper.failedBefore = false; - tryExecute(env, "multi-source-one-partitions exactly once test"); - - - deleteTestTopic(topic); - } - - - /** - * Tests that the source can be properly canceled when reading full partitions. - */ - public void runCancelingOnFullInputTest() throws Exception { - final String topic = "cancelingOnFullTopic"; - - final int parallelism = 3; - createTestTopic(topic, parallelism, 1); - - // launch a producer thread - DataGenerators.InfiniteStringsGenerator generator = - new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic); - generator.start(); - - // launch a consumer asynchronously - - final AtomicReference<Throwable> jobError = new AtomicReference<>(); - - final Runnable jobRunner = new Runnable() { - @Override - public void run() { - try { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setParallelism(parallelism); - env.enableCheckpointing(100); - env.getConfig().disableSysoutLogging(); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props); - - env.addSource(source).addSink(new DiscardingSink<String>()); - - env.execute("Runner for CancelingOnFullInputTest"); - } - catch (Throwable t) { - jobError.set(t); - } - } - }; - - Thread runnerThread = new Thread(jobRunner, "program runner thread"); - runnerThread.start(); - - // wait a bit before canceling - Thread.sleep(2000); - - Throwable failueCause = jobError.get(); - if(failueCause != null) { - failueCause.printStackTrace(); - Assert.fail("Test failed prematurely with: " + failueCause.getMessage()); - } - - // cancel - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "Runner for CancelingOnFullInputTest"); - - // wait for the program to be done and validate that we failed with the right exception - runnerThread.join(); - - failueCause = jobError.get(); - assertNotNull("program did not fail properly due to canceling", failueCause); - assertTrue(failueCause.getMessage().contains("Job was cancelled")); - - if (generator.isAlive()) { - generator.shutdown(); - generator.join(); - } - else { - Throwable t = generator.getError(); - if (t != null) { - t.printStackTrace(); - fail("Generator failed: " + t.getMessage()); - } else { - fail("Generator failed with no exception"); - } - } - - deleteTestTopic(topic); - } - - /** - * Tests that the source can be properly canceled when reading empty partitions. - */ - public void runCancelingOnEmptyInputTest() throws Exception { - final String topic = "cancelingOnEmptyInputTopic"; - - final int parallelism = 3; - createTestTopic(topic, parallelism, 1); - - final AtomicReference<Throwable> error = new AtomicReference<>(); - - final Runnable jobRunner = new Runnable() { - @Override - public void run() { - try { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setParallelism(parallelism); - env.enableCheckpointing(100); - env.getConfig().disableSysoutLogging(); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props); - - env.addSource(source).addSink(new DiscardingSink<String>()); - - env.execute("CancelingOnEmptyInputTest"); - } - catch (Throwable t) { - LOG.error("Job Runner failed with exception", t); - error.set(t); - } - } - }; - - Thread runnerThread = new Thread(jobRunner, "program runner thread"); - runnerThread.start(); - - // wait a bit before canceling - Thread.sleep(2000); - - Throwable failueCause = error.get(); - if (failueCause != null) { - failueCause.printStackTrace(); - Assert.fail("Test failed prematurely with: " + failueCause.getMessage()); - } - // cancel - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); - - // wait for the program to be done and validate that we failed with the right exception - runnerThread.join(); - - failueCause = error.get(); - assertNotNull("program did not fail properly due to canceling", failueCause); - assertTrue(failueCause.getMessage().contains("Job was cancelled")); - - deleteTestTopic(topic); - } - - /** - * Tests that the source can be properly canceled when reading full partitions. - */ - public void runFailOnDeployTest() throws Exception { - final String topic = "failOnDeployTopic"; - - createTestTopic(topic, 2, 1); - - DeserializationSchema<Integer> schema = - new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setParallelism(12); // needs to be more that the mini cluster has slots - env.getConfig().disableSysoutLogging(); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props); - - env - .addSource(kafkaSource) - .addSink(new DiscardingSink<Integer>()); - - try { - env.execute("test fail on deploy"); - fail("this test should fail with an exception"); - } - catch (ProgramInvocationException e) { - - // validate that we failed due to a NoResourceAvailableException - Throwable cause = e.getCause(); - int depth = 0; - boolean foundResourceException = false; - - while (cause != null && depth++ < 20) { - if (cause instanceof NoResourceAvailableException) { - foundResourceException = true; - break; - } - cause = cause.getCause(); - } - - assertTrue("Wrong exception", foundResourceException); - } - - deleteTestTopic(topic); - } - - /** - * Test producing and consuming into multiple topics - * @throws java.lang.Exception - */ - public void runProduceConsumeMultipleTopics() throws java.lang.Exception { - final int NUM_TOPICS = 5; - final int NUM_ELEMENTS = 20; - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.getConfig().disableSysoutLogging(); - - // create topics with content - final List<String> topics = new ArrayList<>(); - for (int i = 0; i < NUM_TOPICS; i++) { - final String topic = "topic-" + i; - topics.add(topic); - // create topic - createTestTopic(topic, i + 1 /*partitions*/, 1); - } - // run first job, producing into all topics - DataStream<Tuple3<Integer, Integer, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple3<Integer, Integer, String>>() { - - @Override - public void run(SourceContext<Tuple3<Integer, Integer, String>> ctx) throws Exception { - int partition = getRuntimeContext().getIndexOfThisSubtask(); - - for (int topicId = 0; topicId < NUM_TOPICS; topicId++) { - for (int i = 0; i < NUM_ELEMENTS; i++) { - ctx.collect(new Tuple3<>(partition, i, "topic-" + topicId)); - } - } - } - - @Override - public void cancel() { - } - }); - - Tuple2WithTopicSchema schema = new Tuple2WithTopicSchema(env.getConfig()); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - kafkaServer.produceIntoKafka(stream, "dummy", schema, props, null); - - env.execute("Write to topics"); - - // run second job consuming from multiple topics - env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.getConfig().disableSysoutLogging(); - - stream = env.addSource(kafkaServer.getConsumer(topics, schema, props)); - - stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() { - Map<String, Integer> countPerTopic = new HashMap<>(NUM_TOPICS); - @Override - public void flatMap(Tuple3<Integer, Integer, String> value, Collector<Integer> out) throws Exception { - Integer count = countPerTopic.get(value.f2); - if (count == null) { - count = 1; - } else { - count++; - } - countPerTopic.put(value.f2, count); - - // check map: - for (Map.Entry<String, Integer> el: countPerTopic.entrySet()) { - if (el.getValue() < NUM_ELEMENTS) { - break; // not enough yet - } - if (el.getValue() > NUM_ELEMENTS) { - throw new RuntimeException("There is a failure in the test. I've read " + - el.getValue() + " from topic " + el.getKey()); - } - } - // we've seen messages from all topics - throw new SuccessException(); - } - }).setParallelism(1); - - tryExecute(env, "Count elements from the topics"); - - - // delete all topics again - for (int i = 0; i < NUM_TOPICS; i++) { - final String topic = "topic-" + i; - deleteTestTopic(topic); - } - } - - /** - * Serialization scheme forwarding byte[] records. - */ - private static class ByteArraySerializationSchema implements KeyedSerializationSchema<byte[]> { - - @Override - public byte[] serializeKey(byte[] element) { - return null; - } - - @Override - public byte[] serializeValue(byte[] element) { - return element; - } - - @Override - public String getTargetTopic(byte[] element) { - return null; - } - } - - private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>, - KeyedSerializationSchema<Tuple3<Integer, Integer, String>> { - - private final TypeSerializer<Tuple2<Integer, Integer>> ts; - - public Tuple2WithTopicSchema(ExecutionConfig ec) { - ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec); - } - - @Override - public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { - DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); - Tuple2<Integer, Integer> t2 = ts.deserialize(in); - return new Tuple3<>(t2.f0, t2.f1, topic); - } - - @Override - public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) { - return false; - } - - @Override - public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() { - return TypeInfoParser.parse("Tuple3<Integer, Integer, String>"); - } - - @Override - public byte[] serializeKey(Tuple3<Integer, Integer, String> element) { - return null; - } - - @Override - public byte[] serializeValue(Tuple3<Integer, Integer, String> element) { - ByteArrayOutputStream by = new ByteArrayOutputStream(); - DataOutputView out = new DataOutputViewStreamWrapper(by); - try { - ts.serialize(new Tuple2<>(element.f0, element.f1), out); - } catch (IOException e) { - throw new RuntimeException("Error" ,e); - } - return by.toByteArray(); - } - - @Override - public String getTargetTopic(Tuple3<Integer, Integer, String> element) { - return element.f2; - } - } - - /** - * Test Flink's Kafka integration also with very big records (30MB) - * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message - * - */ - public void runBigRecordTestTopology() throws Exception { - - final String topic = "bigRecordTestTopic"; - final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space - - createTestTopic(topic, parallelism, 1); - - final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>"); - - final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema = - new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig()); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - env.enableCheckpointing(100); - env.setParallelism(parallelism); - - // add consuming topology: - Properties consumerProps = new Properties(); - consumerProps.putAll(standardProps); - consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 14)); - consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 14)); // for the new fetcher - consumerProps.setProperty("queued.max.message.chunks", "1"); - consumerProps.putAll(secureProps); - - FlinkKafkaConsumerBase<Tuple2<Long, byte[]>> source = kafkaServer.getConsumer(topic, serSchema, consumerProps); - DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source); - - consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() { - - private int elCnt = 0; - - @Override - public void invoke(Tuple2<Long, byte[]> value) throws Exception { - elCnt++; - if (value.f0 == -1) { - // we should have seen 11 elements now. - if (elCnt == 11) { - throw new SuccessException(); - } else { - throw new RuntimeException("There have been "+elCnt+" elements"); - } - } - if (elCnt > 10) { - throw new RuntimeException("More than 10 elements seen: "+elCnt); - } - } - }); - - // add producing topology - Properties producerProps = new Properties(); - producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 15)); - producerProps.setProperty("retries", "3"); - producerProps.putAll(secureProps); - producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings); - - DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() { - - private boolean running; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - running = true; - } - - @Override - public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception { - Random rnd = new Random(); - long cnt = 0; - int sevenMb = 1024 * 1024 * 7; - - while (running) { - byte[] wl = new byte[sevenMb + rnd.nextInt(sevenMb)]; - ctx.collect(new Tuple2<>(cnt++, wl)); - - Thread.sleep(100); - - if (cnt == 10) { - // signal end - ctx.collect(new Tuple2<>(-1L, new byte[]{1})); - break; - } - } - } - - @Override - public void cancel() { - running = false; - } - }); - - kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(serSchema), producerProps, null); - - tryExecute(env, "big topology test"); - deleteTestTopic(topic); - } - - - public void runBrokerFailureTest() throws Exception { - final String topic = "brokerFailureTestTopic"; - - final int parallelism = 2; - final int numElementsPerPartition = 1000; - final int totalElements = parallelism * numElementsPerPartition; - final int failAfterElements = numElementsPerPartition / 3; - - - createTestTopic(topic, parallelism, 2); - - DataGenerators.generateRandomizedIntegerSequence( - StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort), - kafkaServer, - topic, parallelism, numElementsPerPartition, true); - - // find leader to shut down - int leaderId = kafkaServer.getLeaderToShutDown(topic); - - LOG.info("Leader to shutdown {}", leaderId); - - - // run the topology (the consumers must handle the failures) - - DeserializationSchema<Integer> schema = - new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setParallelism(parallelism); - env.enableCheckpointing(500); - env.setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props); - - env - .addSource(kafkaSource) - .map(new PartitionValidatingMapper(parallelism, 1)) - .map(new BrokerKillingMapper<Integer>(leaderId, failAfterElements)) - .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); - - BrokerKillingMapper.killedLeaderBefore = false; - tryExecute(env, "Broker failure once test"); - - // start a new broker: - kafkaServer.restartBroker(leaderId); - } - - public void runKeyValueTest() throws Exception { - final String topic = "keyvaluetest"; - createTestTopic(topic, 1, 1); - final int ELEMENT_COUNT = 5000; - - // ----------- Write some data into Kafka ------------------- - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setParallelism(1); - env.setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - - DataStream<Tuple2<Long, PojoValue>> kvStream = env.addSource(new SourceFunction<Tuple2<Long, PojoValue>>() { - @Override - public void run(SourceContext<Tuple2<Long, PojoValue>> ctx) throws Exception { - Random rnd = new Random(1337); - for (long i = 0; i < ELEMENT_COUNT; i++) { - PojoValue pojo = new PojoValue(); - pojo.when = new Date(rnd.nextLong()); - pojo.lon = rnd.nextLong(); - pojo.lat = i; - // make every second key null to ensure proper "null" serialization - Long key = (i % 2 == 0) ? null : i; - ctx.collect(new Tuple2<>(key, pojo)); - } - } - @Override - public void cancel() { - } - }); - - KeyedSerializationSchema<Tuple2<Long, PojoValue>> schema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig()); - Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); - producerProperties.setProperty("retries", "3"); - kafkaServer.produceIntoKafka(kvStream, topic, schema, producerProperties, null); - env.execute("Write KV to Kafka"); - - // ----------- Read the data again ------------------- - - env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setParallelism(1); - env.setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - - - KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig()); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, readSchema, props)); - fromKafka.flatMap(new RichFlatMapFunction<Tuple2<Long,PojoValue>, Object>() { - long counter = 0; - @Override - public void flatMap(Tuple2<Long, PojoValue> value, Collector<Object> out) throws Exception { - // the elements should be in order. - Assert.assertTrue("Wrong value " + value.f1.lat, value.f1.lat == counter ); - if (value.f1.lat % 2 == 0) { - assertNull("key was not null", value.f0); - } else { - Assert.assertTrue("Wrong value " + value.f0, value.f0 == counter); - } - counter++; - if (counter == ELEMENT_COUNT) { - // we got the right number of elements - throw new SuccessException(); - } - } - }); - - tryExecute(env, "Read KV from Kafka"); - - deleteTestTopic(topic); - } - - public static class PojoValue { - public Date when; - public long lon; - public long lat; - public PojoValue() {} - } - - - /** - * Test delete behavior and metrics for producer - * @throws Exception - */ - public void runAllDeletesTest() throws Exception { - final String topic = "alldeletestest"; - createTestTopic(topic, 1, 1); - final int ELEMENT_COUNT = 300; - - // ----------- Write some data into Kafka ------------------- - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setParallelism(1); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - - DataStream<Tuple2<byte[], PojoValue>> kvStream = env.addSource(new SourceFunction<Tuple2<byte[], PojoValue>>() { - @Override - public void run(SourceContext<Tuple2<byte[], PojoValue>> ctx) throws Exception { - Random rnd = new Random(1337); - for (long i = 0; i < ELEMENT_COUNT; i++) { - final byte[] key = new byte[200]; - rnd.nextBytes(key); - ctx.collect(new Tuple2<>(key, (PojoValue) null)); - } - } - @Override - public void cancel() { - } - }); - - TypeInformationKeyValueSerializationSchema<byte[], PojoValue> schema = new TypeInformationKeyValueSerializationSchema<>(byte[].class, PojoValue.class, env.getConfig()); - - Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); - producerProperties.setProperty("retries", "3"); - producerProperties.putAll(secureProps); - kafkaServer.produceIntoKafka(kvStream, topic, schema, producerProperties, null); - - env.execute("Write deletes to Kafka"); - - // ----------- Read the data again ------------------- - - env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setParallelism(1); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - DataStream<Tuple2<byte[], PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, schema, props)); - - fromKafka.flatMap(new RichFlatMapFunction<Tuple2<byte[], PojoValue>, Object>() { - long counter = 0; - @Override - public void flatMap(Tuple2<byte[], PojoValue> value, Collector<Object> out) throws Exception { - // ensure that deleted messages are passed as nulls - assertNull(value.f1); - counter++; - if (counter == ELEMENT_COUNT) { - // we got the right number of elements - throw new SuccessException(); - } - } - }); - - tryExecute(env, "Read deletes from Kafka"); - - deleteTestTopic(topic); - } - - /** - * Test that ensures that DeserializationSchema.isEndOfStream() is properly evaluated. - * - * @throws Exception - */ - public void runEndOfStreamTest() throws Exception { - - final int ELEMENT_COUNT = 300; - final String topic = writeSequence("testEndOfStream", ELEMENT_COUNT, 1, 1); - - // read using custom schema - final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env1.setParallelism(1); - env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env1.getConfig().disableSysoutLogging(); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - - DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(ELEMENT_COUNT), props)); - fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Void>() { - @Override - public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception { - // noop ;) - } - }); - - JobExecutionResult result = tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from Kafka"); - - deleteTestTopic(topic); - } - - /** - * Test metrics reporting for consumer - * - * @throws Exception - */ - public void runMetricsTest() throws Throwable { - - // create a stream with 5 topics - final String topic = "metricsStream"; - createTestTopic(topic, 5, 1); - - final Tuple1<Throwable> error = new Tuple1<>(null); - Runnable job = new Runnable() { - @Override - public void run() { - try { - // start job writing & reading data. - final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env1.setParallelism(1); - env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env1.getConfig().disableSysoutLogging(); - env1.disableOperatorChaining(); // let the source read everything into the network buffers - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - - TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig()); - DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); - fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() { - @Override - public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op - } - }); - - DataStream<Tuple2<Integer, Integer>> fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() { - boolean running = true; - - @Override - public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { - int i = 0; - while (running) { - ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask())); - Thread.sleep(1); - } - } - - @Override - public void cancel() { - running = false; - } - }); - - kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null); - - env1.execute("Metrics test job"); - } catch(Throwable t) { - LOG.warn("Got exception during execution", t); - if(!(t.getCause() instanceof JobCancellationException)) { // we'll cancel the job - error.f0 = t; - } - } - } - }; - Thread jobThread = new Thread(job); - jobThread.start(); - - try { - // connect to JMX - MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - // wait until we've found all 5 offset metrics - Set<ObjectName> offsetMetrics = mBeanServer.queryNames(new ObjectName("*current-offsets*:*"), null); - while (offsetMetrics.size() < 5) { // test will time out if metrics are not properly working - if (error.f0 != null) { - // fail test early - throw error.f0; - } - offsetMetrics = mBeanServer.queryNames(new ObjectName("*current-offsets*:*"), null); - Thread.sleep(50); - } - Assert.assertEquals(5, offsetMetrics.size()); - // we can't rely on the consumer to have touched all the partitions already - // that's why we'll wait until all five partitions have a positive offset. - // The test will fail if we never meet the condition - while (true) { - int numPosOffsets = 0; - // check that offsets are correctly reported - for (ObjectName object : offsetMetrics) { - Object offset = mBeanServer.getAttribute(object, "Value"); - if((long) offset >= 0) { - numPosOffsets++; - } - } - if (numPosOffsets == 5) { - break; - } - // wait for the consumer to consume on all partitions - Thread.sleep(50); - } - - // check if producer metrics are also available. - Set<ObjectName> producerMetrics = mBeanServer.queryNames(new ObjectName("*KafkaProducer*:*"), null); - Assert.assertTrue("No producer metrics found", producerMetrics.size() > 30); - - - LOG.info("Found all JMX metrics. Cancelling job."); - } finally { - // cancel - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); - } - - while (jobThread.isAlive()) { - Thread.sleep(50); - } - if (error.f0 != null) { - throw error.f0; - } - - deleteTestTopic(topic); - } - - - public static class FixedNumberDeserializationSchema implements DeserializationSchema<Tuple2<Integer, Integer>> { - - final int finalCount; - int count = 0; - - TypeInformation<Tuple2<Integer, Integer>> ti = TypeInfoParser.parse("Tuple2<Integer, Integer>"); - TypeSerializer<Tuple2<Integer, Integer>> ser = ti.createSerializer(new ExecutionConfig()); - - public FixedNumberDeserializationSchema(int finalCount) { - this.finalCount = finalCount; - } - - @Override - public Tuple2<Integer, Integer> deserialize(byte[] message) throws IOException { - DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); - return ser.deserialize(in); - } - - @Override - public boolean isEndOfStream(Tuple2<Integer, Integer> nextElement) { - return ++count >= finalCount; - } - - @Override - public TypeInformation<Tuple2<Integer, Integer>> getProducedType() { - return ti; - } - } - - - // ------------------------------------------------------------------------ - // Reading writing test data sets - // ------------------------------------------------------------------------ - - /** - * Runs a job using the provided environment to read a sequence of records from a single Kafka topic. - * The method allows to individually specify the expected starting offset and total read value count of each partition. - * The job will be considered successful only if all partition read results match the start offset and value count criteria. - */ - protected void readSequence(StreamExecutionEnvironment env, Properties cc, - final String topicName, - final Map<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset) throws Exception { - final int sourceParallelism = partitionsToValuesCountAndStartOffset.keySet().size(); - - int finalCountTmp = 0; - for (Map.Entry<Integer, Tuple2<Integer, Integer>> valuesCountAndStartOffset : partitionsToValuesCountAndStartOffset.entrySet()) { - finalCountTmp += valuesCountAndStartOffset.getValue().f0; - } - final int finalCount = finalCountTmp; - - final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType = TypeInfoParser.parse("Tuple2<Integer, Integer>"); - - final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser = - new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig()); - - // create the consumer - cc.putAll(secureProps); - FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deser, cc); - - DataStream<Tuple2<Integer, Integer>> source = env - .addSource(consumer).setParallelism(sourceParallelism) - .map(new ThrottledMapper<Tuple2<Integer, Integer>>(20)).setParallelism(sourceParallelism); - - // verify data - source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() { - - private HashMap<Integer, BitSet> partitionsToValueCheck; - private int count = 0; - - @Override - public void open(Configuration parameters) throws Exception { - partitionsToValueCheck = new HashMap<>(); - for (Integer partition : partitionsToValuesCountAndStartOffset.keySet()) { - partitionsToValueCheck.put(partition, new BitSet()); - } - } - - @Override - public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception { - int partition = value.f0; - int val = value.f1; - - BitSet bitSet = partitionsToValueCheck.get(partition); - if (bitSet == null) { - throw new RuntimeException("Got a record from an unknown partition"); - } else { - bitSet.set(val - partitionsToValuesCountAndStartOffset.get(partition).f1); - } - - count++; - - LOG.info("Received message {}, total {} messages", value, count); - - // verify if we've seen everything - if (count == finalCount) { - for (Map.Entry<Integer, BitSet> partitionsToValueCheck : this.partitionsToValueCheck.entrySet()) { - BitSet check = partitionsToValueCheck.getValue(); - int expectedValueCount = partitionsToValuesCountAndStartOffset.get(partitionsToValueCheck.getKey()).f0; - - if (check.cardinality() != expectedValueCount) { - throw new RuntimeException("Expected cardinality to be " + expectedValueCount + - ", but was " + check.cardinality()); - } else if (check.nextClearBit(0) != expectedValueCount) { - throw new RuntimeException("Expected next clear bit to be " + expectedValueCount + - ", but was " + check.cardinality()); - } - } - - // test has passed - throw new SuccessException(); - } - } - - }).setParallelism(1); - - tryExecute(env, "Read data from Kafka"); - - LOG.info("Successfully read sequence for verification"); - } - - /** - * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, Properties, String, Map)} to - * expect reading from the same start offset and the same value count for all partitions of a single Kafka topic. - */ - protected void readSequence(StreamExecutionEnvironment env, Properties cc, - final int sourceParallelism, - final String topicName, - final int valuesCount, final int startFrom) throws Exception { - HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset = new HashMap<>(); - for (int i = 0; i < sourceParallelism; i++) { - partitionsToValuesCountAndStartOffset.put(i, new Tuple2<>(valuesCount, startFrom)); - } - readSequence(env, cc, topicName, partitionsToValuesCountAndStartOffset); - } - - protected String writeSequence( - String baseTopicName, - final int numElements, - final int parallelism, - final int replicationFactor) throws Exception - { - LOG.info("\n===================================\n" + - "== Writing sequence of " + numElements + " into " + baseTopicName + " with p=" + parallelism + "\n" + - "==================================="); - - final TypeInformation<Tuple2<Integer, Integer>> resultType = - TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {}); - - final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema = - new KeyedSerializationSchemaWrapper<>( - new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); - - final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema = - new KeyedDeserializationSchemaWrapper<>( - new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); - - final int maxNumAttempts = 10; - - for (int attempt = 1; attempt <= maxNumAttempts; attempt++) { - - final String topicName = baseTopicName + '-' + attempt; - - LOG.info("Writing attempt #1"); - - // -------- Write the Sequence -------- - - createTestTopic(topicName, parallelism, replicationFactor); - - StreamExecutionEnvironment writeEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - writeEnv.getConfig().disableSysoutLogging(); - - DataStream<Tuple2<Integer, Integer>> stream = writeEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() { - - private boolean running = true; - - @Override - public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { - int cnt = 0; - int partition = getRuntimeContext().getIndexOfThisSubtask(); - - while (running && cnt < numElements) { - ctx.collect(new Tuple2<>(partition, cnt)); - cnt++; - } - } - - @Override - public void cancel() { - running = false; - } - }).setParallelism(parallelism); - - // the producer must not produce duplicates - Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); - producerProperties.setProperty("retries", "0"); - producerProperties.putAll(secureProps); - - kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2Partitioner(parallelism)) - .setParallelism(parallelism); - - try { - writeEnv.execute("Write sequence"); - } - catch (Exception e) { - LOG.error("Write attempt failed, trying again", e); - deleteTestTopic(topicName); - JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); - continue; - } - - LOG.info("Finished writing sequence"); - - // -------- Validate the Sequence -------- - - // we need to validate the sequence, because kafka's producers are not exactly once - LOG.info("Validating sequence"); - - JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); - - final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - readEnv.getConfig().disableSysoutLogging(); - readEnv.setParallelism(parallelism); - - Properties readProps = (Properties) standardProps.clone(); - readProps.setProperty("group.id", "flink-tests-validator"); - readProps.putAll(secureProps); - FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps); - - readEnv - .addSource(consumer) - .map(new RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { - - private final int totalCount = parallelism * numElements; - private int count = 0; - - @Override - public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception { - if (++count == totalCount) { - throw new SuccessException(); - } else { - return value; - } - } - }).setParallelism(1) - .addSink(new DiscardingSink<Tuple2<Integer, Integer>>()).setParallelism(1); - - final AtomicReference<Throwable> errorRef = new AtomicReference<>(); - - Thread runner = new Thread() { - @Override - public void run() { - try { - tryExecute(readEnv, "sequence validation"); - } catch (Throwable t) { - errorRef.set(t); - } - } - }; - runner.start(); - - final long deadline = System.currentTimeMillis() + 10000; - long delay; - while (runner.isAlive() && (delay = deadline - System.currentTimeMillis()) > 0) { - runner.join(delay); - } - - boolean success; - - if (runner.isAlive()) { - // did not finish in time, maybe the producer dropped one or more records and - // the validation did not reach the exit point - success = false; - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); - } - else { - Throwable error = errorRef.get(); - if (error != null) { - success = false; - LOG.info("Attempt " + attempt + " failed with exception", error); - } - else { - success = true; - } - } - - JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); - - if (success) { - // everything is good! - return topicName; - } - else { - deleteTestTopic(topicName); - // fall through the loop - } - } - - throw new Exception("Could not write a valid sequence to Kafka after " + maxNumAttempts + " attempts"); - } - - // ------------------------------------------------------------------------ - // Debugging utilities - // ------------------------------------------------------------------------ - - /** - * Read topic to list, only using Kafka code. - */ - private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) { - ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config); - // we request only one stream per consumer instance. Kafka will make sure that each consumer group - // will see each message only once. - Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1); - Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap); - if (streams.size() != 1) { - throw new RuntimeException("Expected only one message stream but got "+streams.size()); - } - List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName); - if (kafkaStreams == null) { - throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString()); - } - if (kafkaStreams.size() != 1) { - throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams"); - } - LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId()); - ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator(); - - List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>(); - int read = 0; - while(iteratorToRead.hasNext()) { - read++; - result.add(iteratorToRead.next()); - if (read == stopAfter) { - LOG.info("Read "+read+" elements"); - return result; - } - } - return result; - } - - private static void printTopic(String topicName, ConsumerConfig config, - DeserializationSchema<?> deserializationSchema, - int stopAfter) throws IOException { - - List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter); - LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId()); - - for (MessageAndMetadata<byte[], byte[]> message: contents) { - Object out = deserializationSchema.deserialize(message.message()); - LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString()); - } - } - - private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer) - throws IOException - { - // write the sequence to log for debugging purposes - Properties newProps = new Properties(standardProps); - newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString()); - newProps.setProperty("auto.offset.reset", "smallest"); - newProps.setProperty("zookeeper.connect", standardProps.getProperty("zookeeper.connect")); - newProps.putAll(secureProps); - - ConsumerConfig printerConfig = new ConsumerConfig(newProps); - printTopic(topicName, printerConfig, deserializer, elements); - } - - - public static class BrokerKillingMapper<T> extends RichMapFunction<T,T> - implements Checkpointed<Integer>, CheckpointListener { - - private static final long serialVersionUID = 6334389850158707313L; - - public static volatile boolean killedLeaderBefore; - public static volatile boolean hasBeenCheckpointedBeforeFailure; - - private final int shutdownBrokerId; - private final int failCount; - private int numElementsTotal; - - private boolean failer; - private boolean hasBeenCheckpointed; - - - public BrokerKillingMapper(int shutdownBrokerId, int failCount) { - this.shutdownBrokerId = shutdownBrokerId; - this.failCount = failCount; - } - - @Override - public void open(Configuration parameters) { - failer = getRuntimeContext().getIndexOfThisSubtask() == 0; - } - - @Override - public T map(T value) throws Exception { - numElementsTotal++; - - if (!killedLeaderBefore) { - Thread.sleep(10); - - if (failer && numElementsTotal >= failCount) { - // shut down a Kafka broker - KafkaServer toShutDown = null; - for (KafkaServer server : kafkaServer.getBrokers()) { - - if (kafkaServer.getBrokerId(server) == shutdownBrokerId) { - toShutDown = server; - break; - } - } - - if (toShutDown == null) { - StringBuilder listOfBrokers = new StringBuilder(); - for (KafkaServer server : kafkaServer.getBrokers()) { - listOfBrokers.append(kafkaServer.getBrokerId(server)); - listOfBrokers.append(" ; "); - } - - throw new Exception("Cannot find broker to shut down: " + shutdownBrokerId - + " ; available brokers: " + listOfBrokers.toString()); - } - else { - hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed; - killedLeaderBefore = true; - toShutDown.shutdown(); - } - } - } - return value; - } - - @Override - public void notifyCheckpointComplete(long checkpointId) { - hasBeenCheckpointed = true; - } - - @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return numElementsTotal; - } - - @Override - public void restoreState(Integer state) { - this.numElementsTotal = state; - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java deleted file mode 100644 index c925c8f..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; -import org.apache.flink.test.util.SuccessException; - - -import java.io.Serializable; -import java.util.Properties; - -import static org.apache.flink.test.util.TestUtils.tryExecute; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -@SuppressWarnings("serial") -public abstract class KafkaProducerTestBase extends KafkaTestBase { - - - /** - * - * <pre> - * +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+ - * / | \ - * / | \ - * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink) - * \ | / - * \ | / - * +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+ - * </pre> - * - * The mapper validates that the values come consistently from the correct Kafka partition. - * - * The final sink validates that there are no duplicates and that all partitions are present. - */ - public void runCustomPartitioningTest() { - try { - LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()"); - - final String topic = "customPartitioningTestTopic"; - final int parallelism = 3; - - createTestTopic(topic, parallelism, 1); - - TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>"); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - - TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema = - new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig()); - - TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema = - new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig()); - - // ------ producing topology --------- - - // source has DOP 1 to make sure it generates no duplicates - DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() { - - private boolean running = true; - - @Override - public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception { - long cnt = 0; - while (running) { - ctx.collect(new Tuple2<Long, String>(cnt, "kafka-" + cnt)); - cnt++; - } - } - - @Override - public void cancel() { - running = false; - } - }) - .setParallelism(1); - - Properties props = new Properties(); - props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings)); - props.putAll(secureProps); - - // sink partitions into - kafkaServer.produceIntoKafka(stream, topic, - new KeyedSerializationSchemaWrapper<>(serSchema), - props, - new CustomPartitioner(parallelism)).setParallelism(parallelism); - - // ------ consuming topology --------- - - Properties consumerProps = new Properties(); - consumerProps.putAll(standardProps); - consumerProps.putAll(secureProps); - FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, consumerProps); - - env.addSource(source).setParallelism(parallelism) - - // mapper that validates partitioning and maps to partition - .map(new RichMapFunction<Tuple2<Long, String>, Integer>() { - - private int ourPartition = -1; - @Override - public Integer map(Tuple2<Long, String> value) { - int partition = value.f0.intValue() % parallelism; - if (ourPartition != -1) { - assertEquals("inconsistent partitioning", ourPartition, partition); - } else { - ourPartition = partition; - } - return partition; - } - }).setParallelism(parallelism) - - .addSink(new SinkFunction<Integer>() { - - private int[] valuesPerPartition = new int[parallelism]; - - @Override - public void invoke(Integer value) throws Exception { - valuesPerPartition[value]++; - - boolean missing = false; - for (int i : valuesPerPartition) { - if (i < 100) { - missing = true; - break; - } - } - if (!missing) { - throw new SuccessException(); - } - } - }).setParallelism(1); - - tryExecute(env, "custom partitioning test"); - - deleteTestTopic(topic); - - LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()"); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // ------------------------------------------------------------------------ - - public static class CustomPartitioner extends KafkaPartitioner<Tuple2<Long, String>> implements Serializable { - - private final int expectedPartitions; - - public CustomPartitioner(int expectedPartitions) { - this.expectedPartitions = expectedPartitions; - } - - - @Override - public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { - assertEquals(expectedPartitions, numPartitions); - - return (int) (next.f0 % numPartitions); - } - } -}
