http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java new file mode 100644 index 0000000..6d259fa --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; + +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Optional; + +/** + * IT cases for Kafka 0.11 . + */ +public class Kafka011ITCase extends KafkaConsumerTestBase { + + @BeforeClass + public static void prepare() throws ClassNotFoundException { + KafkaProducerTestBase.prepare(); + ((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE); + } + + // ------------------------------------------------------------------------ + // Suite of Tests + // ------------------------------------------------------------------------ + + @Test(timeout = 60000) + public void testFailOnNoBroker() throws Exception { + runFailOnNoBrokerTest(); + } + + @Test(timeout = 60000) + public void testConcurrentProducerConsumerTopology() throws Exception { + runSimpleConcurrentProducerConsumerTopology(); + } + + @Test(timeout = 60000) + public void testKeyValueSupport() throws Exception { + runKeyValueTest(); + } + + // --- canceling / failures --- + + @Test(timeout = 60000) + public void testCancelingEmptyTopic() throws Exception { + runCancelingOnEmptyInputTest(); + } + + @Test(timeout = 60000) + public void testCancelingFullTopic() throws Exception { + runCancelingOnFullInputTest(); + } + + @Test(timeout = 60000) + public void testFailOnDeploy() throws Exception { + runFailOnDeployTest(); + } + + // --- source to partition mappings and exactly once --- + + @Test(timeout = 60000) + public void testOneToOneSources() throws Exception { + runOneToOneExactlyOnceTest(); + } + + @Test(timeout = 60000) + public void testOneSourceMultiplePartitions() throws Exception { + runOneSourceMultiplePartitionsExactlyOnceTest(); + } + + @Test(timeout = 60000) + public void testMultipleSourcesOnePartition() throws Exception { + runMultipleSourcesOnePartitionExactlyOnceTest(); + } + + // --- broker failure --- + + @Test(timeout = 60000) + public void testBrokerFailure() throws Exception { + runBrokerFailureTest(); + } + + // --- special executions --- + + @Test(timeout = 60000) + public void testBigRecordJob() throws Exception { + runBigRecordTestTopology(); + } + + @Test(timeout = 60000) + public void testMultipleTopics() throws Exception { + runProduceConsumeMultipleTopics(); + } + + @Test(timeout = 60000) + public void testAllDeletes() throws Exception { + runAllDeletesTest(); + } + + @Test(timeout = 60000) + public void testMetricsAndEndOfStream() throws Exception { + runEndOfStreamTest(); + } + + // --- startup mode --- + + @Test(timeout = 60000) + public void testStartFromEarliestOffsets() throws Exception { + runStartFromEarliestOffsets(); + } + + @Test(timeout = 60000) + public void testStartFromLatestOffsets() throws Exception { + runStartFromLatestOffsets(); + } + + @Test(timeout = 60000) + public void testStartFromGroupOffsets() throws Exception { + runStartFromGroupOffsets(); + } + + @Test(timeout = 60000) + public void testStartFromSpecificOffsets() throws Exception { + runStartFromSpecificOffsets(); + } + + // --- offset committing --- + + @Test(timeout = 60000) + public void testCommitOffsetsToKafka() throws Exception { + runCommitOffsetsToKafka(); + } + + @Test(timeout = 60000) + public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception { + runAutoOffsetRetrievalAndCommitToKafka(); + } + + /** + * Kafka 0.11 specific test, ensuring Timestamps are properly written to and read from Kafka. + */ + @Test(timeout = 60000) + public void testTimestamps() throws Exception { + + final String topic = "tstopic"; + createTestTopic(topic, 3, 1); + + // ---------- Produce an event time stream into Kafka ------------------- + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().disableSysoutLogging(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + DataStream<Long> streamWithTimestamps = env.addSource(new SourceFunction<Long>() { + private static final long serialVersionUID = -2255115836471289626L; + boolean running = true; + + @Override + public void run(SourceContext<Long> ctx) throws Exception { + long i = 0; + while (running) { + ctx.collectWithTimestamp(i, i * 2); + if (i++ == 1110L) { + running = false; + } + } + } + + @Override + public void cancel() { + running = false; + } + }); + + final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig()); + FlinkKafkaProducer011<Long> prod = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, Optional.of(new FlinkKafkaPartitioner<Long>() { + private static final long serialVersionUID = -6730989584364230617L; + + @Override + public int partition(Long next, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return (int) (next % 3); + } + })); + prod.setWriteTimestampToKafka(true); + + streamWithTimestamps.addSink(prod).setParallelism(3); + + env.execute("Produce some"); + + // ---------- Consume stream from Kafka ------------------- + + env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().disableSysoutLogging(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + FlinkKafkaConsumer011<Long> kafkaSource = new FlinkKafkaConsumer011<>(topic, new LimitedLongDeserializer(), standardProps); + kafkaSource.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Long>() { + private static final long serialVersionUID = -4834111173247835189L; + + @Nullable + @Override + public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) { + if (lastElement % 11 == 0) { + return new Watermark(lastElement); + } + return null; + } + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return previousElementTimestamp; + } + }); + + DataStream<Long> stream = env.addSource(kafkaSource); + GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class); + stream.transform("timestamp validating operator", objectTypeInfo, new TimestampValidatingOperator()).setParallelism(1); + + env.execute("Consume again"); + + deleteTestTopic(topic); + } + + private static class TimestampValidatingOperator extends StreamSink<Long> { + + private static final long serialVersionUID = 1353168781235526806L; + + public TimestampValidatingOperator() { + super(new SinkFunction<Long>() { + private static final long serialVersionUID = -6676565693361786524L; + + @Override + public void invoke(Long value) throws Exception { + throw new RuntimeException("Unexpected"); + } + }); + } + + long elCount = 0; + long wmCount = 0; + long lastWM = Long.MIN_VALUE; + + @Override + public void processElement(StreamRecord<Long> element) throws Exception { + elCount++; + if (element.getValue() * 2 != element.getTimestamp()) { + throw new RuntimeException("Invalid timestamp: " + element); + } + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + wmCount++; + + if (lastWM <= mark.getTimestamp()) { + lastWM = mark.getTimestamp(); + } else { + throw new RuntimeException("Received watermark higher than the last one"); + } + + if (mark.getTimestamp() % 11 != 0 && mark.getTimestamp() != Long.MAX_VALUE) { + throw new RuntimeException("Invalid watermark: " + mark.getTimestamp()); + } + } + + @Override + public void close() throws Exception { + super.close(); + if (elCount != 1110L) { + throw new RuntimeException("Wrong final element count " + elCount); + } + + if (wmCount <= 2) { + throw new RuntimeException("Almost no watermarks have been sent " + wmCount); + } + } + } + + private static class LimitedLongDeserializer implements KeyedDeserializationSchema<Long> { + + private static final long serialVersionUID = 6966177118923713521L; + private final TypeInformation<Long> ti; + private final TypeSerializer<Long> ser; + long cnt = 0; + + public LimitedLongDeserializer() { + this.ti = TypeInfoParser.parse("Long"); + this.ser = ti.createSerializer(new ExecutionConfig()); + } + + @Override + public TypeInformation<Long> getProducedType() { + return ti; + } + + @Override + public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + cnt++; + DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); + Long e = ser.deserialize(in); + return e; + } + + @Override + public boolean isEndOfStream(Long nextElement) { + return cnt > 1110L; + } + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java new file mode 100644 index 0000000..c2e256c --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; +import org.apache.flink.types.Row; + +import java.util.Properties; + +/** + * Tests for the {@link Kafka011JsonTableSource}. + */ +public class Kafka011JsonTableSourceTest extends KafkaTableSourceTestBase { + + @Override + protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) { + return new Kafka011JsonTableSource(topic, properties, typeInfo); + } + + @Override + @SuppressWarnings("unchecked") + protected Class<DeserializationSchema<Row>> getDeserializationSchema() { + return (Class) JsonRowDeserializationSchema.class; + } + + @Override + @SuppressWarnings("unchecked") + protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() { + return (Class) FlinkKafkaConsumer011.class; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java new file mode 100644 index 0000000..ad63662 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.junit.BeforeClass; + +/** + * IT cases for the {@link FlinkKafkaProducer011}. + */ +@SuppressWarnings("serial") +public class Kafka011ProducerAtLeastOnceITCase extends KafkaProducerTestBase { + + @BeforeClass + public static void prepare() throws ClassNotFoundException { + KafkaProducerTestBase.prepare(); + ((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE); + } + + @Override + public void testExactlyOnceRegularSink() throws Exception { + // disable test for at least once semantic + } + + @Override + public void testExactlyOnceCustomOperator() throws Exception { + // disable test for at least once semantic + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java new file mode 100644 index 0000000..1167238 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.junit.BeforeClass; + +/** + * IT cases for the {@link FlinkKafkaProducer011}. + */ +@SuppressWarnings("serial") +public class Kafka011ProducerExactlyOnceITCase extends KafkaProducerTestBase { + @BeforeClass + public static void prepare() throws ClassNotFoundException { + KafkaProducerTestBase.prepare(); + ((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); + } + + @Override + public void testOneToOneAtLeastOnceRegularSink() throws Exception { + // TODO: fix this test + // currently very often (~50% cases) KafkaProducer live locks itself on commitTransaction call. + // Somehow Kafka 0.11 doesn't play along with NetworkFailureProxy. This can either mean a bug in Kafka + // that it doesn't work well with some weird network failures, or the NetworkFailureProxy is a broken design + // and this test should be reimplemented in completely different way... + } + + @Override + public void testOneToOneAtLeastOnceCustomOperator() throws Exception { + // TODO: fix this test + // currently very often (~50% cases) KafkaProducer live locks itself on commitTransaction call. + // Somehow Kafka 0.11 doesn't play along with NetworkFailureProxy. This can either mean a bug in Kafka + // that it doesn't work well with some weird network failures, or the NetworkFailureProxy is a broken design + // and this test should be reimplemented in completely different way... + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java new file mode 100644 index 0000000..e81148b --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.networking.NetworkFailuresProxy; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.util.NetUtils; + +import kafka.admin.AdminUtils; +import kafka.common.KafkaException; +import kafka.metrics.KafkaMetricsReporter; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.ZkUtils; +import org.I0Itec.zkclient.ZkClient; +import org.apache.commons.collections.list.UnmodifiableList; +import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingServer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.BindException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; + +import scala.collection.mutable.ArraySeq; + +import static org.apache.flink.util.NetUtils.hostAndPortToUrlString; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * An implementation of the KafkaServerProvider for Kafka 0.11 . + */ +public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { + + protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class); + private File tmpZkDir; + private File tmpKafkaParent; + private List<File> tmpKafkaDirs; + private List<KafkaServer> brokers; + private TestingServer zookeeper; + private String zookeeperConnectionString; + private String brokerConnectionString = ""; + private Properties standardProps; + private FlinkKafkaProducer011.Semantic producerSemantic = FlinkKafkaProducer011.Semantic.EXACTLY_ONCE; + // 6 seconds is default. Seems to be too small for travis. 30 seconds + private int zkTimeout = 30000; + private Config config; + + public String getBrokerConnectionString() { + return brokerConnectionString; + } + + public void setProducerSemantic(FlinkKafkaProducer011.Semantic producerSemantic) { + this.producerSemantic = producerSemantic; + } + + @Override + public Properties getStandardProperties() { + return standardProps; + } + + @Override + public Properties getSecureProperties() { + Properties prop = new Properties(); + if (config.isSecureMode()) { + prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); + prop.put("security.protocol", "SASL_PLAINTEXT"); + prop.put("sasl.kerberos.service.name", "kafka"); + + //add special timeout for Travis + prop.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout)); + prop.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout)); + prop.setProperty("metadata.fetch.timeout.ms", "120000"); + } + return prop; + } + + @Override + public String getVersion() { + return "0.11"; + } + + @Override + public List<KafkaServer> getBrokers() { + return brokers; + } + + @Override + public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) { + return new FlinkKafkaConsumer011<>(topics, readSchema, props); + } + + @Override + public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) { + List<ConsumerRecord<K, V>> result = new ArrayList<>(); + + try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) { + consumer.assign(Arrays.asList(new TopicPartition(topic, partition))); + + while (true) { + boolean processedAtLeastOneRecord = false; + + // wait for new records with timeout and break the loop if we didn't get any + Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator(); + while (iterator.hasNext()) { + ConsumerRecord<K, V> record = iterator.next(); + result.add(record); + processedAtLeastOneRecord = true; + } + + if (!processedAtLeastOneRecord) { + break; + } + } + consumer.commitSync(); + } + + return UnmodifiableList.decorate(result); + } + + @Override + public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) { + return new StreamSink<>(new FlinkKafkaProducer011<>( + topic, + serSchema, + props, + Optional.ofNullable(partitioner), + producerSemantic, + FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE)); + } + + @Override + public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) { + return stream.addSink(new FlinkKafkaProducer011<>( + topic, + serSchema, + props, + Optional.ofNullable(partitioner), + producerSemantic, + FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE)); + } + + @Override + public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) { + FlinkKafkaProducer011<T> prod = new FlinkKafkaProducer011<>( + topic, serSchema, props, Optional.of(new FlinkFixedPartitioner<>()), producerSemantic, FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); + + prod.setWriteTimestampToKafka(true); + + return stream.addSink(prod); + } + + @Override + public KafkaOffsetHandler createOffsetHandler() { + return new KafkaOffsetHandlerImpl(); + } + + @Override + public void restartBroker(int leaderId) throws Exception { + brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId))); + } + + @Override + public int getLeaderToShutDown(String topic) throws Exception { + ZkUtils zkUtils = getZkUtils(); + try { + MetadataResponse.PartitionMetadata firstPart = null; + do { + if (firstPart != null) { + LOG.info("Unable to find leader. error code {}", firstPart.error().code()); + // not the first try. Sleep a bit + Thread.sleep(150); + } + + List<MetadataResponse.PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionMetadata(); + firstPart = partitionMetadata.get(0); + } + while (firstPart.error().code() != 0); + + return firstPart.leader().id(); + } finally { + zkUtils.close(); + } + } + + @Override + public int getBrokerId(KafkaServer server) { + return server.config().brokerId(); + } + + @Override + public boolean isSecureRunSupported() { + return true; + } + + @Override + public void prepare(Config config) { + //increase the timeout since in Travis ZK connection takes long time for secure connection. + if (config.isSecureMode()) { + //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout + config.setKafkaServersNumber(1); + zkTimeout = zkTimeout * 15; + } + this.config = config; + + File tempDir = new File(System.getProperty("java.io.tmpdir")); + tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); + assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); + + tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString())); + assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); + + tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber()); + for (int i = 0; i < config.getKafkaServersNumber(); i++) { + File tmpDir = new File(tmpKafkaParent, "server-" + i); + assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); + tmpKafkaDirs.add(tmpDir); + } + + zookeeper = null; + brokers = null; + + try { + zookeeper = new TestingServer(-1, tmpZkDir); + zookeeperConnectionString = zookeeper.getConnectString(); + LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString); + + LOG.info("Starting KafkaServer"); + brokers = new ArrayList<>(config.getKafkaServersNumber()); + + ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT); + for (int i = 0; i < config.getKafkaServersNumber(); i++) { + KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i)); + brokers.add(kafkaServer); + brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName)); + brokerConnectionString += ","; + } + + LOG.info("ZK and KafkaServer started."); + } + catch (Throwable t) { + t.printStackTrace(); + fail("Test setup failed: " + t.getMessage()); + } + + standardProps = new Properties(); + standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); + standardProps.setProperty("bootstrap.servers", brokerConnectionString); + standardProps.setProperty("group.id", "flink-tests"); + standardProps.setProperty("enable.auto.commit", "false"); + standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout)); + standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout)); + standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.11 value) + standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) + } + + @Override + public void shutdown() { + for (KafkaServer broker : brokers) { + if (broker != null) { + broker.shutdown(); + } + } + brokers.clear(); + + if (zookeeper != null) { + try { + zookeeper.stop(); + } + catch (Exception e) { + LOG.warn("ZK.stop() failed", e); + } + zookeeper = null; + } + + // clean up the temp spaces + + if (tmpKafkaParent != null && tmpKafkaParent.exists()) { + try { + FileUtils.deleteDirectory(tmpKafkaParent); + } + catch (Exception e) { + // ignore + } + } + if (tmpZkDir != null && tmpZkDir.exists()) { + try { + FileUtils.deleteDirectory(tmpZkDir); + } + catch (Exception e) { + // ignore + } + } + } + + public ZkUtils getZkUtils() { + ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), + Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); + return ZkUtils.apply(creator, false); + } + + @Override + public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) { + // create topic with one client + LOG.info("Creating topic {}", topic); + + ZkUtils zkUtils = getZkUtils(); + try { + AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig, kafka.admin.RackAwareMode.Enforced$.MODULE$); + } finally { + zkUtils.close(); + } + + // validate that the topic has been created + final long deadline = System.nanoTime() + 30_000_000_000L; + do { + try { + if (config.isSecureMode()) { + //increase wait time since in Travis ZK timeout occurs frequently + int wait = zkTimeout / 100; + LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic); + Thread.sleep(wait); + } else { + Thread.sleep(100); + } + } catch (InterruptedException e) { + // restore interrupted state + } + // we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are + // not always correct. + + // create a new ZK utils connection + ZkUtils checkZKConn = getZkUtils(); + if (AdminUtils.topicExists(checkZKConn, topic)) { + checkZKConn.close(); + return; + } + checkZKConn.close(); + } + while (System.nanoTime() < deadline); + fail("Test topic could not be created"); + } + + @Override + public void deleteTestTopic(String topic) { + ZkUtils zkUtils = getZkUtils(); + try { + LOG.info("Deleting topic {}", topic); + + ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), + Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); + + AdminUtils.deleteTopic(zkUtils, topic); + + zk.close(); + } finally { + zkUtils.close(); + } + } + + /** + * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed). + */ + protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception { + Properties kafkaProperties = new Properties(); + + // properties have to be Strings + kafkaProperties.put("advertised.host.name", KAFKA_HOST); + kafkaProperties.put("broker.id", Integer.toString(brokerId)); + kafkaProperties.put("log.dir", tmpFolder.toString()); + kafkaProperties.put("zookeeper.connect", zookeeperConnectionString); + kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024)); + kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024)); + kafkaProperties.put("transaction.max.timeout.ms", Integer.toString(1000 * 60 * 60 * 2)); // 2hours + + // for CI stability, increase zookeeper session timeout + kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout); + kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout); + if (config.getKafkaServerProperties() != null) { + kafkaProperties.putAll(config.getKafkaServerProperties()); + } + + final int numTries = 5; + + for (int i = 1; i <= numTries; i++) { + int kafkaPort = NetUtils.getAvailablePort(); + kafkaProperties.put("port", Integer.toString(kafkaPort)); + + if (config.isHideKafkaBehindProxy()) { + NetworkFailuresProxy proxy = createProxy(KAFKA_HOST, kafkaPort); + kafkaProperties.put("advertised.port", proxy.getLocalPort()); + } + + //to support secure kafka cluster + if (config.isSecureMode()) { + LOG.info("Adding Kafka secure configurations"); + kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); + kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); + kafkaProperties.putAll(getSecureProperties()); + } + + KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); + + try { + scala.Option<String> stringNone = scala.Option.apply(null); + KafkaServer server = new KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, new ArraySeq<KafkaMetricsReporter>(0)); + server.startup(); + return server; + } + catch (KafkaException e) { + if (e.getCause() instanceof BindException) { + // port conflict, retry... + LOG.info("Port conflict when starting Kafka Broker. Retrying..."); + } + else { + throw e; + } + } + } + + throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts."); + } + + private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler { + + private final KafkaConsumer<byte[], byte[]> offsetClient; + + public KafkaOffsetHandlerImpl() { + Properties props = new Properties(); + props.putAll(standardProps); + props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + + offsetClient = new KafkaConsumer<>(props); + } + + @Override + public Long getCommittedOffset(String topicName, int partition) { + OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition)); + return (committed != null) ? committed.offset() : null; + } + + @Override + public void setCommittedOffset(String topicName, int partition, long offset) { + Map<TopicPartition, OffsetAndMetadata> partitionAndOffset = new HashMap<>(); + partitionAndOffset.put(new TopicPartition(topicName, partition), new OffsetAndMetadata(offset)); + offsetClient.commitSync(partitionAndOffset); + } + + @Override + public void close() { + offsetClient.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java index 681fe02..c3c9c07 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java @@ -23,6 +23,15 @@ package org.apache.flink.streaming.connectors.kafka; */ @SuppressWarnings("serial") public class Kafka08ProducerITCase extends KafkaProducerTestBase { + @Override + public void testExactlyOnceRegularSink() throws Exception { + // Kafka08 does not support exactly once semantic + } + + @Override + public void testExactlyOnceCustomOperator() throws Exception { + // Kafka08 does not support exactly once semantic + } @Override public void testOneToOneAtLeastOnceRegularSink() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java index 847f818..b34132f 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java @@ -24,6 +24,16 @@ package org.apache.flink.streaming.connectors.kafka; @SuppressWarnings("serial") public class Kafka09ProducerITCase extends KafkaProducerTestBase { @Override + public void testExactlyOnceRegularSink() throws Exception { + // Kafka08 does not support exactly once semantic + } + + @Override + public void testExactlyOnceCustomOperator() throws Exception { + // Kafka08 does not support exactly once semantic + } + + @Override public void testOneToOneAtLeastOnceCustomOperator() throws Exception { // Disable this test since FlinkKafka09Producer doesn't support custom operator mode } http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index fda6832..e9a0331 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -174,7 +174,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { stream.print(); see.execute("No broker test"); } catch (JobExecutionException jee) { - if (kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10")) { + if (kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10") || kafkaServer.getVersion().equals("0.11")) { assertTrue(jee.getCause() instanceof TimeoutException); TimeoutException te = (TimeoutException) jee.getCause(); http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java index 35607dd..e1ba074 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java @@ -38,26 +38,25 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper; +import org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; import org.apache.flink.test.util.SuccessException; +import org.apache.flink.test.util.TestUtils; import org.apache.flink.util.Preconditions; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Test; import java.io.Serializable; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import static org.apache.flink.test.util.TestUtils.tryExecute; import static org.junit.Assert.assertEquals; @@ -295,38 +294,79 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { } /** - * We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error. - * After timeout we assume that there are missing records and there is a bug, not that the test has run out of time. + * Tests the exactly-once semantic for the simple writes into Kafka. */ - private void assertAtLeastOnceForTopic( - Properties properties, - String topic, - int partition, - Set<Integer> expectedElements, - long timeoutMillis) throws Exception { - - long startMillis = System.currentTimeMillis(); - Set<Integer> actualElements = new HashSet<>(); - - // until we timeout... - while (System.currentTimeMillis() < startMillis + timeoutMillis) { - properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); - properties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); - - // query kafka for new records ... - Collection<ConsumerRecord<Integer, Integer>> records = kafkaServer.getAllRecordsFromTopic(properties, topic, partition, 100); - - for (ConsumerRecord<Integer, Integer> record : records) { - actualElements.add(record.value()); - } + @Test + public void testExactlyOnceRegularSink() throws Exception { + testExactlyOnce(true); + } + + /** + * Tests the exactly-once semantic for the simple writes into Kafka. + */ + @Test + public void testExactlyOnceCustomOperator() throws Exception { + testExactlyOnce(false); + } + + /** + * This test sets KafkaProducer so that it will automatically flush the data and + * and fails the broker to check whether flushed records since last checkpoint were not duplicated. + */ + protected void testExactlyOnce(boolean regularSink) throws Exception { + final String topic = regularSink ? "exactlyOnceTopicRegularSink" : "exactlyTopicCustomOperator"; + final int partition = 0; + final int numElements = 1000; + final int failAfterElements = 333; + + createTestTopic(topic, 1, 1); + + TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); + KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(500); + env.setParallelism(1); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.getConfig().disableSysoutLogging(); + + Properties properties = new Properties(); + properties.putAll(standardProps); + properties.putAll(secureProps); - // succeed if we got all expectedElements - if (actualElements.containsAll(expectedElements)) { - return; + // process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application + List<Integer> expectedElements = getIntegersSequence(numElements); + + DataStream<Integer> inputStream = env + .addSource(new IntegerSource(numElements)) + .map(new FailingIdentityMapper<Integer>(failAfterElements)); + + FlinkKafkaPartitioner<Integer> partitioner = new FlinkKafkaPartitioner<Integer>() { + @Override + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return partition; } + }; + if (regularSink) { + StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, partitioner); + inputStream.addSink(kafkaSink.getUserFunction()); + } + else { + kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, partitioner); } - fail(String.format("Expected to contain all of: <%s>, but was: <%s>", expectedElements, actualElements)); + FailingIdentityMapper.failedBefore = false; + TestUtils.tryExecute(env, "Exactly once test"); + + // assert that before failure we successfully snapshot/flushed all expected elements + assertExactlyOnceForTopic( + properties, + topic, + partition, + expectedElements, + 30000L); + + deleteTestTopic(topic); } private List<Integer> getIntegersSequence(int size) { http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index f8792e5..fcdb59b 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -32,6 +32,7 @@ import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -39,11 +40,18 @@ import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; import java.util.Properties; +import java.util.Set; import java.util.concurrent.TimeUnit; import scala.concurrent.duration.FiniteDuration; +import static org.junit.Assert.fail; + /** * The base for the Kafka tests. It brings up: * <ul> @@ -209,4 +217,80 @@ public abstract class KafkaTestBase extends TestLogger { kafkaServer.deleteTestTopic(topic); } + /** + * We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error. + * After timeout we assume that there are missing records and there is a bug, not that the test has run out of time. + */ + protected void assertAtLeastOnceForTopic( + Properties properties, + String topic, + int partition, + Set<Integer> expectedElements, + long timeoutMillis) throws Exception { + + long startMillis = System.currentTimeMillis(); + Set<Integer> actualElements = new HashSet<>(); + + // until we timeout... + while (System.currentTimeMillis() < startMillis + timeoutMillis) { + properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); + properties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); + + // query kafka for new records ... + Collection<ConsumerRecord<Integer, Integer>> records = kafkaServer.getAllRecordsFromTopic(properties, topic, partition, 100); + + for (ConsumerRecord<Integer, Integer> record : records) { + actualElements.add(record.value()); + } + + // succeed if we got all expectedElements + if (actualElements.containsAll(expectedElements)) { + return; + } + } + + fail(String.format("Expected to contain all of: <%s>, but was: <%s>", expectedElements, actualElements)); + } + + /** + * We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error. + * After timeout we assume that there are missing records and there is a bug, not that the test has run out of time. + */ + protected void assertExactlyOnceForTopic( + Properties properties, + String topic, + int partition, + List<Integer> expectedElements, + long timeoutMillis) throws Exception { + + long startMillis = System.currentTimeMillis(); + List<Integer> actualElements = new ArrayList<>(); + + Properties consumerProperties = new Properties(); + consumerProperties.putAll(properties); + consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); + consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); + consumerProperties.put("isolation.level", "read_committed"); + + // until we timeout... + while (System.currentTimeMillis() < startMillis + timeoutMillis) { + // query kafka for new records ... + Collection<ConsumerRecord<Integer, Integer>> records = kafkaServer.getAllRecordsFromTopic(consumerProperties, topic, partition, 1000); + + for (ConsumerRecord<Integer, Integer> record : records) { + actualElements.add(record.value()); + } + + // succeed if we got all expectedElements + if (actualElements.equals(expectedElements)) { + return; + } + // fail early if we already have too many elements + if (actualElements.size() > expectedElements.size()) { + break; + } + } + + fail(String.format("Expected number of elements: <%s>, but was: <%s>", expectedElements.size(), actualElements.size())); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java new file mode 100644 index 0000000..ef50766 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.util.SerializableObject; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; + +import java.util.Collections; +import java.util.List; + +/** + * A Flink source that servers integers, but it completes only after a completed checkpoint after serving + * all of the elements. + */ +public class IntegerSource + extends RichParallelSourceFunction<Integer> + implements ListCheckpointed<Integer>, CheckpointListener { + + /** + * Blocker when the generator needs to wait for the checkpoint to happen. + * Eager initialization means it must be serializable (pick any serializable type). + */ + private final Object blocker = new SerializableObject(); + + /** + * The total number of events to generate. + */ + private final int numEventsTotal; + + /** + * The current position in the sequence of numbers. + */ + private int currentPosition = -1; + + private long lastCheckpointTriggered; + + private long lastCheckpointConfirmed; + + private boolean restored; + + private volatile boolean running = true; + + public IntegerSource(int numEventsTotal) { + this.numEventsTotal = numEventsTotal; + } + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + + // each source subtask emits only the numbers where (num % parallelism == subtask_index) + final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks(); + int current = this.currentPosition >= 0 ? this.currentPosition : getRuntimeContext().getIndexOfThisSubtask(); + + while (this.running && current < this.numEventsTotal) { + // emit the next element + synchronized (ctx.getCheckpointLock()) { + ctx.collect(current); + current += stepSize; + this.currentPosition = current; + } + // give some time to trigger checkpoint while we are not holding the lock (to prevent starvation) + if (!restored && current % 10 == 0) { + Thread.sleep(1); + } + } + + // after we are done, we need to wait for two more checkpoint to complete + // before finishing the program - that is to be on the safe side that + // the sink also got the "commit" notification for all relevant checkpoints + // and committed the data + final long lastCheckpoint; + synchronized (ctx.getCheckpointLock()) { + lastCheckpoint = this.lastCheckpointTriggered; + } + + synchronized (this.blocker) { + while (this.lastCheckpointConfirmed <= lastCheckpoint + 1) { + this.blocker.wait(); + } + } + } + + @Override + public void cancel() { + this.running = false; + } + + @Override + public List<Integer> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + this.lastCheckpointTriggered = checkpointId; + + return Collections.singletonList(this.currentPosition); + } + + @Override + public void restoreState(List<Integer> state) throws Exception { + this.currentPosition = state.get(0); + + // at least one checkpoint must have happened so far + this.lastCheckpointTriggered = 1L; + this.lastCheckpointConfirmed = 1L; + this.restored = true; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + synchronized (blocker) { + this.lastCheckpointConfirmed = checkpointId; + blocker.notifyAll(); + } + } +}
