http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java new file mode 100644 index 0000000..c17aae6 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +import java.util.List; + +/** + * The ConsumerCallBridge simply calls methods on the {@link KafkaConsumer}. + * + * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10, + * for example changing {@code assign(List)} to {@code assign(Collection)}. + * + * Because of that, we need to two versions whose compiled code goes against different method signatures. + * Even though the source of subclasses may look identical, the byte code will be different, because they + * are compiled against different dependencies. + */ +public class KafkaConsumerCallBridge { + + public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception { + consumer.assign(topicPartitions); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java new file mode 100644 index 0000000..9cfa840 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; +import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The thread the runs the {@link KafkaConsumer}, connecting to the brokers and polling records. + * The thread pushes the data into a {@link Handover} to be picked up by the fetcher that will + * deserialize and emit the records. + * + * <p><b>IMPORTANT:</b> This thread must not be interrupted when attempting to shut it down. + * The Kafka consumer code was found to not always handle interrupts well, and to even + * deadlock in certain situations. + * + * <p>Implementation Note: This code is written to be reusable in later versions of the KafkaConsumer. + * Because Kafka is not maintaining binary compatibility, we use a "call bridge" as an indirection + * to the KafkaConsumer calls that change signature. + */ +public class KafkaConsumerThread extends Thread { + + /** Logger for this consumer */ + private final Logger log; + + /** The handover of data and exceptions between the consumer thread and the task thread */ + private final Handover handover; + + /** The next offsets that the main thread should commit */ + private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit; + + /** The configuration for the Kafka consumer */ + private final Properties kafkaProperties; + + /** The partitions that this consumer reads from */ + private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions; + + /** We get this from the outside to publish metrics. **/ + private final MetricGroup kafkaMetricGroup; + + /** The indirections on KafkaConsumer methods, for cases where KafkaConsumer compatibility is broken */ + private final KafkaConsumerCallBridge consumerCallBridge; + + /** The maximum number of milliseconds to wait for a fetch batch */ + private final long pollTimeout; + + /** Flag whether to add Kafka's metrics to the Flink metrics */ + private final boolean useMetrics; + + /** Reference to the Kafka consumer, once it is created */ + private volatile KafkaConsumer<byte[], byte[]> consumer; + + /** Flag to mark the main work loop as alive */ + private volatile boolean running; + + /** Flag tracking whether the latest commit request has completed */ + private volatile boolean commitInProgress; + + + public KafkaConsumerThread( + Logger log, + Handover handover, + Properties kafkaProperties, + KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions, + MetricGroup kafkaMetricGroup, + KafkaConsumerCallBridge consumerCallBridge, + String threadName, + long pollTimeout, + boolean useMetrics) { + + super(threadName); + setDaemon(true); + + this.log = checkNotNull(log); + this.handover = checkNotNull(handover); + this.kafkaProperties = checkNotNull(kafkaProperties); + this.subscribedPartitions = checkNotNull(subscribedPartitions); + this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup); + this.consumerCallBridge = checkNotNull(consumerCallBridge); + this.pollTimeout = pollTimeout; + this.useMetrics = useMetrics; + + this.nextOffsetsToCommit = new AtomicReference<>(); + this.running = true; + } + + // ------------------------------------------------------------------------ + + @Override + public void run() { + // early exit check + if (!running) { + return; + } + + // this is the means to talk to FlinkKafkaConsumer's main thread + final Handover handover = this.handover; + + // This method initializes the KafkaConsumer and guarantees it is torn down properly. + // This is important, because the consumer has multi-threading issues, + // including concurrent 'close()' calls. + final KafkaConsumer<byte[], byte[]> consumer; + try { + consumer = new KafkaConsumer<>(kafkaProperties); + } + catch (Throwable t) { + handover.reportError(t); + return; + } + + // from here on, the consumer is guaranteed to be closed properly + try { + // The callback invoked by Kafka once an offset commit is complete + final OffsetCommitCallback offsetCommitCallback = new CommitCallback(); + + // tell the consumer which partitions to work with + consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitions)); + + // register Kafka's very own metrics in Flink's metric reporters + if (useMetrics) { + // register Kafka metrics to Flink + Map<MetricName, ? extends Metric> metrics = consumer.metrics(); + if (metrics == null) { + // MapR's Kafka implementation returns null here. + log.info("Consumer implementation does not support metrics"); + } else { + // we have Kafka metrics, register them + for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) { + kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue())); + } + } + } + + // early exit check + if (!running) { + return; + } + + // seek the consumer to the initial offsets + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) { + if (partition.isOffsetDefined()) { + log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; " + + "seeking the consumer to position {}", + partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1); + + consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); + } + else { + // for partitions that do not have offsets restored from a checkpoint/savepoint, + // we need to define our internal offset state for them using the initial offsets retrieved from Kafka + // by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint + + long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle()); + + log.info("Partition {} has no initial offset; the consumer has position {}, " + + "so the initial offset will be set to {}", + partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1); + + // the fetched offset represents the next record to process, so we need to subtract it by 1 + partition.setOffset(fetchedOffset - 1); + } + } + + // from now on, external operations may call the consumer + this.consumer = consumer; + + // the latest bulk of records. may carry across the loop if the thread is woken up + // from blocking on the handover + ConsumerRecords<byte[], byte[]> records = null; + + // main fetch loop + while (running) { + + // check if there is something to commit + if (!commitInProgress) { + // get and reset the work-to-be committed, so we don't repeatedly commit the same + final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null); + + if (toCommit != null) { + log.debug("Sending async offset commit request to Kafka broker"); + + // also record that a commit is already in progress + // the order here matters! first set the flag, then send the commit command. + commitInProgress = true; + consumer.commitAsync(toCommit, offsetCommitCallback); + } + } + + // get the next batch of records, unless we did not manage to hand the old batch over + if (records == null) { + try { + records = consumer.poll(pollTimeout); + } + catch (WakeupException we) { + continue; + } + } + + try { + handover.produce(records); + records = null; + } + catch (Handover.WakeupException e) { + // fall through the loop + } + } + // end main fetch loop + } + catch (Throwable t) { + // let the main thread know and exit + // it may be that this exception comes because the main thread closed the handover, in + // which case the below reporting is irrelevant, but does not hurt either + handover.reportError(t); + } + finally { + // make sure the handover is closed if it is not already closed or has an error + handover.close(); + + // make sure the KafkaConsumer is closed + try { + consumer.close(); + } + catch (Throwable t) { + log.warn("Error while closing Kafka consumer", t); + } + } + } + + /** + * Shuts this thread down, waking up the thread gracefully if blocked (without Thread.interrupt() calls). + */ + public void shutdown() { + running = false; + + // We cannot call close() on the KafkaConsumer, because it will actually throw + // an exception if a concurrent call is in progress + + // this wakes up the consumer if it is blocked handing over records + handover.wakeupProducer(); + + // this wakes up the consumer if it is blocked in a kafka poll + if (consumer != null) { + consumer.wakeup(); + } + } + + /** + * Tells this thread to commit a set of offsets. This method does not block, the committing + * operation will happen asynchronously. + * + * <p>Only one commit operation may be pending at any time. If the committing takes longer than + * the frequency with which this method is called, then some commits may be skipped due to being + * superseded by newer ones. + * + * @param offsetsToCommit The offsets to commit + */ + public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) { + // record the work to be committed by the main consumer thread and make sure the consumer notices that + if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) { + log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " + + "Skipping commit of previous offsets because newer complete checkpoint offsets are available. " + + "This does not compromise Flink's checkpoint integrity."); + } + + // if the consumer is blocked in a poll() or handover operation, wake it up to commit soon + handover.wakeupProducer(); + if (consumer != null) { + consumer.wakeup(); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private static List<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) { + ArrayList<TopicPartition> result = new ArrayList<>(partitions.length); + for (KafkaTopicPartitionState<TopicPartition> p : partitions) { + result.add(p.getKafkaPartitionHandle()); + } + return result; + } + + // ------------------------------------------------------------------------ + + private class CommitCallback implements OffsetCommitCallback { + + @Override + public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) { + commitInProgress = false; + + if (ex != null) { + log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties new file mode 100644 index 0000000..6bdfb48 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties @@ -0,0 +1,29 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger + + http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java new file mode 100644 index 0000000..7a82365 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.internal.Handover; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; +import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Kafka09Fetcher}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(KafkaConsumerThread.class) +public class Kafka09FetcherTest { + + @Test + public void testCommitDoesNotBlock() throws Exception { + + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // to synchronize when the consumer is in its blocking method + final OneShotLatch sync = new OneShotLatch(); + + // ----- the mock consumer with blocking poll calls ---- + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { + sync.trigger(); + blockerLatch.await(); + return ConsumerRecords.empty(); + } + }); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) { + blockerLatch.trigger(); + return null; + } + }).when(mockConsumer).wakeup(); + + // make sure the fetcher creates the mock consumer + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext<String> sourceContext = mock(SourceContext.class); + List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( + sourceContext, + topics, + null, /* periodic watermark extractor */ + null, /* punctuated watermark extractor */ + new TestProcessingTimeService(), + 10, /* watermark interval */ + this.getClass().getClassLoader(), + true, /* checkpointing */ + "task_name", + new UnregisteredMetricsGroup(), + schema, + new Properties(), + 0L, + false); + + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + final Thread fetcherRunner = new Thread("fetcher runner") { + + @Override + public void run() { + try { + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }; + fetcherRunner.start(); + + // wait until the fetcher has reached the method of interest + sync.await(); + + // ----- trigger the offset commit ----- + + final AtomicReference<Throwable> commitError = new AtomicReference<>(); + final Thread committer = new Thread("committer runner") { + @Override + public void run() { + try { + fetcher.commitInternalOffsetsToKafka(testCommitData); + } catch (Throwable t) { + commitError.set(t); + } + } + }; + committer.start(); + + // ----- ensure that the committer finishes in time ----- + committer.join(30000); + assertFalse("The committer did not finish in time", committer.isAlive()); + + // ----- test done, wait till the fetcher is done for a clean shutdown ----- + fetcher.cancel(); + fetcherRunner.join(); + + // check that there were no errors in the fetcher + final Throwable fetcherError = error.get(); + if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) { + throw new Exception("Exception in the fetcher", fetcherError); + } + final Throwable committerError = commitError.get(); + if (committerError != null) { + throw new Exception("Exception in the committer", committerError); + } + } + + @Test + public void ensureOffsetsGetCommitted() throws Exception { + + // test data + final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42); + final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99); + + final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>(); + testCommitData1.put(testPartition1, 11L); + testCommitData1.put(testPartition2, 18L); + + final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>(); + testCommitData2.put(testPartition1, 19L); + testCommitData2.put(testPartition2, 28L); + + final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>(); + + + // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ---- + + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { + blockerLatch.await(); + return ConsumerRecords.empty(); + } + }); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) { + blockerLatch.trigger(); + return null; + } + }).when(mockConsumer).wakeup(); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) { + @SuppressWarnings("unchecked") + Map<TopicPartition, OffsetAndMetadata> offsets = + (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0]; + + OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1]; + + commitStore.add(offsets); + callback.onComplete(offsets, null); + + return null; + } + }).when(mockConsumer).commitAsync( + Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class)); + + // make sure the fetcher creates the mock consumer + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext<String> sourceContext = mock(SourceContext.class); + List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( + sourceContext, + topics, + null, /* periodic watermark extractor */ + null, /* punctuated watermark extractor */ + new TestProcessingTimeService(), + 10, /* watermark interval */ + this.getClass().getClassLoader(), + true, /* checkpointing */ + "task_name", + new UnregisteredMetricsGroup(), + schema, + new Properties(), + 0L, + false); + + + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + final Thread fetcherRunner = new Thread("fetcher runner") { + + @Override + public void run() { + try { + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }; + fetcherRunner.start(); + + // ----- trigger the first offset commit ----- + + fetcher.commitInternalOffsetsToKafka(testCommitData1); + Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take(); + + for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) { + TopicPartition partition = entry.getKey(); + if (partition.topic().equals("test")) { + assertEquals(42, partition.partition()); + assertEquals(12L, entry.getValue().offset()); + } + else if (partition.topic().equals("another")) { + assertEquals(99, partition.partition()); + assertEquals(17L, entry.getValue().offset()); + } + } + + // ----- trigger the second offset commit ----- + + fetcher.commitInternalOffsetsToKafka(testCommitData2); + Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take(); + + for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) { + TopicPartition partition = entry.getKey(); + if (partition.topic().equals("test")) { + assertEquals(42, partition.partition()); + assertEquals(20L, entry.getValue().offset()); + } + else if (partition.topic().equals("another")) { + assertEquals(99, partition.partition()); + assertEquals(27L, entry.getValue().offset()); + } + } + + // ----- test done, wait till the fetcher is done for a clean shutdown ----- + fetcher.cancel(); + fetcherRunner.join(); + + // check that there were no errors in the fetcher + final Throwable caughtError = error.get(); + if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) { + throw new Exception("Exception in the fetcher", caughtError); + } + } + + @Test + public void testCancellationWhenEmitBlocks() throws Exception { + + // ----- some test data ----- + + final String topic = "test-topic"; + final int partition = 3; + final byte[] payload = new byte[] {1, 2, 3, 4}; + + final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList( + new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload), + new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload), + new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload)); + + final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>(); + data.put(new TopicPartition(topic, partition), records); + + final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data); + + // ----- the test consumer ----- + + final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) { + return consumerRecords; + } + }); + + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- build a fetcher ----- + + BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>(); + List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition)); + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( + sourceContext, + topics, + null, /* periodic watermark extractor */ + null, /* punctuated watermark extractor */ + new TestProcessingTimeService(), + 10, /* watermark interval */ + this.getClass().getClassLoader(), + true, /* checkpointing */ + "task_name", + new UnregisteredMetricsGroup(), + schema, + new Properties(), + 0L, + false); + + + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + final Thread fetcherRunner = new Thread("fetcher runner") { + + @Override + public void run() { + try { + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }; + fetcherRunner.start(); + + // wait until the thread started to emit records to the source context + sourceContext.waitTillHasBlocker(); + + // now we try to cancel the fetcher, including the interruption usually done on the task thread + // once it has finished, there must be no more thread blocked on the source context + fetcher.cancel(); + fetcherRunner.interrupt(); + fetcherRunner.join(); + + assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking()); + } + + // ------------------------------------------------------------------------ + // test utilities + // ------------------------------------------------------------------------ + + private static final class BlockingSourceContext<T> implements SourceContext<T> { + + private final ReentrantLock lock = new ReentrantLock(); + private final OneShotLatch inBlocking = new OneShotLatch(); + + @Override + public void collect(T element) { + block(); + } + + @Override + public void collectWithTimestamp(T element, long timestamp) { + block(); + } + + @Override + public void emitWatermark(Watermark mark) { + block(); + } + + @Override + public Object getCheckpointLock() { + return new Object(); + } + + @Override + public void close() {} + + public void waitTillHasBlocker() throws InterruptedException { + inBlocking.await(); + } + + public boolean isStillBlocking() { + return lock.isLocked(); + } + + @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"}) + private void block() { + lock.lock(); + try { + inBlocking.trigger(); + + // put this thread to sleep indefinitely + final Object o = new Object(); + while (true) { + synchronized (o) { + o.wait(); + } + } + } + catch (InterruptedException e) { + // exit cleanly, simply reset the interruption flag + Thread.currentThread().interrupt(); + } + finally { + lock.unlock(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java new file mode 100644 index 0000000..d18e2a9 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.junit.Test; + +public class Kafka09ITCase extends KafkaConsumerTestBase { + + // ------------------------------------------------------------------------ + // Suite of Tests + // ------------------------------------------------------------------------ + + @Test(timeout = 60000) + public void testFailOnNoBroker() throws Exception { + runFailOnNoBrokerTest(); + } + + @Test(timeout = 60000) + public void testConcurrentProducerConsumerTopology() throws Exception { + runSimpleConcurrentProducerConsumerTopology(); + } + + + @Test(timeout = 60000) + public void testKeyValueSupport() throws Exception { + runKeyValueTest(); + } + + // --- canceling / failures --- + + @Test(timeout = 60000) + public void testCancelingEmptyTopic() throws Exception { + runCancelingOnEmptyInputTest(); + } + + @Test(timeout = 60000) + public void testCancelingFullTopic() throws Exception { + runCancelingOnFullInputTest(); + } + + @Test(timeout = 60000) + public void testFailOnDeploy() throws Exception { + runFailOnDeployTest(); + } + + + // --- source to partition mappings and exactly once --- + + @Test(timeout = 60000) + public void testOneToOneSources() throws Exception { + runOneToOneExactlyOnceTest(); + } + + @Test(timeout = 60000) + public void testOneSourceMultiplePartitions() throws Exception { + runOneSourceMultiplePartitionsExactlyOnceTest(); + } + + @Test(timeout = 60000) + public void testMultipleSourcesOnePartition() throws Exception { + runMultipleSourcesOnePartitionExactlyOnceTest(); + } + + // --- broker failure --- + + @Test(timeout = 60000) + public void testBrokerFailure() throws Exception { + runBrokerFailureTest(); + } + + // --- special executions --- + + @Test(timeout = 60000) + public void testBigRecordJob() throws Exception { + runBigRecordTestTopology(); + } + + @Test(timeout = 60000) + public void testMultipleTopics() throws Exception { + runProduceConsumeMultipleTopics(); + } + + @Test(timeout = 60000) + public void testAllDeletes() throws Exception { + runAllDeletesTest(); + } + + @Test(timeout = 60000) + public void testEndOfStream() throws Exception { + runEndOfStreamTest(); + } + + @Test(timeout = 60000) + public void testMetrics() throws Throwable { + runMetricsTest(); + } + + // --- offset committing --- + + @Test(timeout = 60000) + public void testCommitOffsetsToKafka() throws Exception { + runCommitOffsetsToKafka(); + } + + @Test(timeout = 60000) + public void testStartFromKafkaCommitOffsets() throws Exception { + runStartFromKafkaCommitOffsets(); + } + + @Test(timeout = 60000) + public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception { + runAutoOffsetRetrievalAndCommitToKafka(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java new file mode 100644 index 0000000..45f70ac --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import java.util.Properties; + +public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase { + + @Override + protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, + final FlinkKafkaProducerBase<Row> kafkaProducer) { + + return new Kafka09JsonTableSink(topic, properties, partitioner) { + @Override + protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, + SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) { + return kafkaProducer; + } + }; + } + + @Override + @SuppressWarnings("unchecked") + protected SerializationSchema<Row> getSerializationSchema() { + return new JsonRowSerializationSchema(FIELD_NAMES); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java new file mode 100644 index 0000000..4a75f50 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.Properties; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; + +public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase { + + @Override + protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) { + return new Kafka09JsonTableSource(topic, properties, fieldNames, typeInfo); + } + + @Override + @SuppressWarnings("unchecked") + protected Class<DeserializationSchema<Row>> getDeserializationSchema() { + return (Class) JsonRowDeserializationSchema.class; + } + + @Override + @SuppressWarnings("unchecked") + protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() { + return (Class) FlinkKafkaConsumer09.class; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java new file mode 100644 index 0000000..ae4f5b2 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + + +import org.junit.Test; + +@SuppressWarnings("serial") +public class Kafka09ProducerITCase extends KafkaProducerTestBase { + + @Test + public void testCustomPartitioning() { + runCustomPartitioningTest(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java new file mode 100644 index 0000000..e748537 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.test.util.SecureTestEnvironment; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/* + * Kafka Secure Connection (kerberos) IT test case + */ +public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase { + + protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecuredRunITCase.class); + + @BeforeClass + public static void prepare() throws IOException, ClassNotFoundException { + LOG.info("-------------------------------------------------------------------------"); + LOG.info(" Starting Kafka09SecuredRunITCase "); + LOG.info("-------------------------------------------------------------------------"); + + SecureTestEnvironment.prepare(tempFolder); + SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration()); + + startClusters(true); + } + + @AfterClass + public static void shutDownServices() { + shutdownClusters(); + SecureTestEnvironment.cleanup(); + } + + + //timeout interval is large since in Travis, ZK connection timeout occurs frequently + //The timeout for the test case is 2 times timeout of ZK connection + @Test(timeout = 600000) + public void testMultipleTopics() throws Exception { + runProduceConsumeMultipleTopics(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java new file mode 100644 index 0000000..18b2aec --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.flink.util.TestLogger; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.PartitionInfo; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(FlinkKafkaProducerBase.class) +public class KafkaProducerTest extends TestLogger { + + @Test + @SuppressWarnings("unchecked") + public void testPropagateExceptions() { + try { + // mock kafka producer + KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class); + + // partition setup + when(kafkaProducerMock.partitionsFor(anyString())).thenReturn( + // returning a unmodifiable list to mimic KafkaProducer#partitionsFor() behaviour + Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null))); + + // failure when trying to send an element + when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class))) + .thenAnswer(new Answer<Future<RecordMetadata>>() { + @Override + public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable { + Callback callback = (Callback) invocation.getArguments()[1]; + callback.onCompletion(null, new Exception("Test error")); + return null; + } + }); + + // make sure the FlinkKafkaProducer instantiates our mock producer + whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock); + + // (1) producer that propagates errors + + FlinkKafkaProducer09<String> producerPropagating = new FlinkKafkaProducer09<>( + "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating)); + + testHarness.open(); + + try { + testHarness.processElement(new StreamRecord<>("value")); + testHarness.processElement(new StreamRecord<>("value")); + fail("This should fail with an exception"); + } + catch (Exception e) { + assertNotNull(e.getCause()); + assertNotNull(e.getCause().getMessage()); + assertTrue(e.getCause().getMessage().contains("Test error")); + } + + // (2) producer that only logs errors + + FlinkKafkaProducer09<String> producerLogging = new FlinkKafkaProducer09<>( + "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null); + producerLogging.setLogFailuresOnly(true); + + testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging)); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>("value")); + testHarness.processElement(new StreamRecord<>("value")); + + testHarness.close(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java new file mode 100644 index 0000000..1802e0c --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.streaming.connectors.kafka; + +import kafka.admin.AdminUtils; +import kafka.common.KafkaException; +import kafka.api.PartitionMetadata; +import kafka.network.SocketServer; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.SystemTime$; +import kafka.utils.ZkUtils; +import org.I0Itec.zkclient.ZkClient; +import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingServer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.util.NetUtils; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.Seq; + +import java.io.File; +import java.net.BindException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import static org.apache.flink.util.NetUtils.hostAndPortToUrlString; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * An implementation of the KafkaServerProvider for Kafka 0.9 + */ +public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { + + protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class); + private File tmpZkDir; + private File tmpKafkaParent; + private List<File> tmpKafkaDirs; + private List<KafkaServer> brokers; + private TestingServer zookeeper; + private String zookeeperConnectionString; + private String brokerConnectionString = ""; + private Properties standardProps; + private Properties additionalServerProperties; + private boolean secureMode = false; + // 6 seconds is default. Seems to be too small for travis. 30 seconds + private String zkTimeout = "30000"; + + public String getBrokerConnectionString() { + return brokerConnectionString; + } + + @Override + public Properties getStandardProperties() { + return standardProps; + } + + @Override + public String getVersion() { + return "0.9"; + } + + @Override + public List<KafkaServer> getBrokers() { + return brokers; + } + + @Override + public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) { + return new FlinkKafkaConsumer09<>(topics, readSchema, props); + } + + @Override + public <T> StreamSink<T> getProducerSink( + String topic, + KeyedSerializationSchema<T> serSchema, + Properties props, + KafkaPartitioner<T> partitioner) { + FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner); + prod.setFlushOnCheckpoint(true); + return new StreamSink<>(prod); + } + + @Override + public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { + FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner); + prod.setFlushOnCheckpoint(true); + return stream.addSink(prod); + } + + @Override + public KafkaOffsetHandler createOffsetHandler(Properties props) { + return new KafkaOffsetHandlerImpl(props); + } + + @Override + public void restartBroker(int leaderId) throws Exception { + brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId))); + } + + @Override + public int getLeaderToShutDown(String topic) throws Exception { + ZkUtils zkUtils = getZkUtils(); + try { + PartitionMetadata firstPart = null; + do { + if (firstPart != null) { + LOG.info("Unable to find leader. error code {}", firstPart.errorCode()); + // not the first try. Sleep a bit + Thread.sleep(150); + } + + Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionsMetadata(); + firstPart = partitionMetadata.head(); + } + while (firstPart.errorCode() != 0); + + return firstPart.leader().get().id(); + } finally { + zkUtils.close(); + } + } + + @Override + public int getBrokerId(KafkaServer server) { + return server.config().brokerId(); + } + + @Override + public boolean isSecureRunSupported() { + return true; + } + + @Override + public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) { + + //increase the timeout since in Travis ZK connection takes long time for secure connection. + if(secureMode) { + //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout + numKafkaServers = 1; + zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15); + } + + this.additionalServerProperties = additionalServerProperties; + this.secureMode = secureMode; + File tempDir = new File(System.getProperty("java.io.tmpdir")); + + tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); + assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); + + tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString())); + assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); + + tmpKafkaDirs = new ArrayList<>(numKafkaServers); + for (int i = 0; i < numKafkaServers; i++) { + File tmpDir = new File(tmpKafkaParent, "server-" + i); + assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); + tmpKafkaDirs.add(tmpDir); + } + + zookeeper = null; + brokers = null; + + try { + LOG.info("Starting Zookeeper"); + zookeeper = new TestingServer(-1, tmpZkDir); + zookeeperConnectionString = zookeeper.getConnectString(); + LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString); + + LOG.info("Starting KafkaServer"); + brokers = new ArrayList<>(numKafkaServers); + + for (int i = 0; i < numKafkaServers; i++) { + brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); + + SocketServer socketServer = brokers.get(i).socketServer(); + if(secureMode) { + brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ","; + } else { + brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ","; + } + } + + LOG.info("ZK and KafkaServer started."); + } + catch (Throwable t) { + t.printStackTrace(); + fail("Test setup failed: " + t.getMessage()); + } + + LOG.info("brokerConnectionString --> {}", brokerConnectionString); + + standardProps = new Properties(); + standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); + standardProps.setProperty("bootstrap.servers", brokerConnectionString); + standardProps.setProperty("group.id", "flink-tests"); + standardProps.setProperty("enable.auto.commit", "false"); + standardProps.setProperty("zookeeper.session.timeout.ms", zkTimeout); + standardProps.setProperty("zookeeper.connection.timeout.ms", zkTimeout); + standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.9 value) + standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) + + } + + @Override + public void shutdown() { + for (KafkaServer broker : brokers) { + if (broker != null) { + broker.shutdown(); + } + } + brokers.clear(); + + if (zookeeper != null) { + try { + zookeeper.stop(); + zookeeper.close(); + } + catch (Exception e) { + LOG.warn("ZK.stop() failed", e); + } + zookeeper = null; + } + + // clean up the temp spaces + + if (tmpKafkaParent != null && tmpKafkaParent.exists()) { + try { + FileUtils.deleteDirectory(tmpKafkaParent); + } + catch (Exception e) { + // ignore + } + } + if (tmpZkDir != null && tmpZkDir.exists()) { + try { + FileUtils.deleteDirectory(tmpZkDir); + } + catch (Exception e) { + // ignore + } + } + } + + public ZkUtils getZkUtils() { + LOG.info("In getZKUtils:: zookeeperConnectionString = {}", zookeeperConnectionString); + ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), + Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); + return ZkUtils.apply(creator, false); + } + + @Override + public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) { + // create topic with one client + LOG.info("Creating topic {}", topic); + + ZkUtils zkUtils = getZkUtils(); + try { + AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig); + } finally { + zkUtils.close(); + } + + LOG.info("Topic {} create request is successfully posted", topic); + + // validate that the topic has been created + final long deadline = System.currentTimeMillis() + Integer.parseInt(zkTimeout); + do { + try { + if(secureMode) { + //increase wait time since in Travis ZK timeout occurs frequently + int wait = Integer.parseInt(zkTimeout) / 100; + LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic); + Thread.sleep(wait); + } else { + Thread.sleep(100); + } + + } catch (InterruptedException e) { + // restore interrupted state + } + // we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are + // not always correct. + + LOG.info("Validating if the topic {} has been created or not", topic); + + // create a new ZK utils connection + ZkUtils checkZKConn = getZkUtils(); + if(AdminUtils.topicExists(checkZKConn, topic)) { + LOG.info("topic {} has been created successfully", topic); + checkZKConn.close(); + return; + } + LOG.info("topic {} has not been created yet. Will check again...", topic); + checkZKConn.close(); + } + while (System.currentTimeMillis() < deadline); + fail("Test topic could not be created"); + } + + @Override + public void deleteTestTopic(String topic) { + ZkUtils zkUtils = getZkUtils(); + try { + LOG.info("Deleting topic {}", topic); + + ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), + Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); + + AdminUtils.deleteTopic(zkUtils, topic); + + zk.close(); + } finally { + zkUtils.close(); + } + } + + /** + * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed) + */ + protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception { + Properties kafkaProperties = new Properties(); + + // properties have to be Strings + kafkaProperties.put("advertised.host.name", KAFKA_HOST); + kafkaProperties.put("broker.id", Integer.toString(brokerId)); + kafkaProperties.put("log.dir", tmpFolder.toString()); + kafkaProperties.put("zookeeper.connect", zookeeperConnectionString); + kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024)); + kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024)); + + // for CI stability, increase zookeeper session timeout + kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout); + kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout); + if(additionalServerProperties != null) { + kafkaProperties.putAll(additionalServerProperties); + } + + final int numTries = 5; + + for (int i = 1; i <= numTries; i++) { + int kafkaPort = NetUtils.getAvailablePort(); + kafkaProperties.put("port", Integer.toString(kafkaPort)); + + //to support secure kafka cluster + if(secureMode) { + LOG.info("Adding Kafka secure configurations"); + kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); + kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); + kafkaProperties.putAll(getSecureProperties()); + } + + KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); + + try { + scala.Option<String> stringNone = scala.Option.apply(null); + KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone); + server.startup(); + return server; + } + catch (KafkaException e) { + if (e.getCause() instanceof BindException) { + // port conflict, retry... + LOG.info("Port conflict when starting Kafka Broker. Retrying..."); + } + else { + throw e; + } + } + } + + throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts."); + } + + public Properties getSecureProperties() { + Properties prop = new Properties(); + if(secureMode) { + prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); + prop.put("security.protocol", "SASL_PLAINTEXT"); + prop.put("sasl.kerberos.service.name", "kafka"); + + //add special timeout for Travis + prop.setProperty("zookeeper.session.timeout.ms", zkTimeout); + prop.setProperty("zookeeper.connection.timeout.ms", zkTimeout); + prop.setProperty("metadata.fetch.timeout.ms","120000"); + } + return prop; + } + + private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler { + + private final KafkaConsumer<byte[], byte[]> offsetClient; + + public KafkaOffsetHandlerImpl(Properties props) { + offsetClient = new KafkaConsumer<>(props); + } + + @Override + public Long getCommittedOffset(String topicName, int partition) { + OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition)); + return (committed != null) ? committed.offset() : null; + } + + @Override + public void close() { + offsetClient.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java new file mode 100644 index 0000000..25040eb --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.streaming.connectors.kafka.internal.Handover.WakeupException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +import org.junit.Test; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +/** + * Tests for the {@link Handover} between Kafka Consumer Thread and the fetcher's main thread. + */ +public class HandoverTest { + + // ------------------------------------------------------------------------ + // test produce / consumer + // ------------------------------------------------------------------------ + + @Test + public void testWithVariableProducer() throws Exception { + runProducerConsumerTest(500, 2, 0); + } + + @Test + public void testWithVariableConsumer() throws Exception { + runProducerConsumerTest(500, 0, 2); + } + + @Test + public void testWithVariableBoth() throws Exception { + runProducerConsumerTest(500, 2, 2); + } + + // ------------------------------------------------------------------------ + // test error propagation + // ------------------------------------------------------------------------ + + @Test + public void testPublishErrorOnEmptyHandover() throws Exception { + final Handover handover = new Handover(); + + Exception error = new Exception(); + handover.reportError(error); + + try { + handover.pollNext(); + fail("should throw an exception"); + } + catch (Exception e) { + assertEquals(error, e); + } + } + + @Test + public void testPublishErrorOnFullHandover() throws Exception { + final Handover handover = new Handover(); + handover.produce(createTestRecords()); + + IOException error = new IOException(); + handover.reportError(error); + + try { + handover.pollNext(); + fail("should throw an exception"); + } + catch (Exception e) { + assertEquals(error, e); + } + } + + @Test + public void testExceptionMarksClosedOnEmpty() throws Exception { + final Handover handover = new Handover(); + + IllegalStateException error = new IllegalStateException(); + handover.reportError(error); + + try { + handover.produce(createTestRecords()); + fail("should throw an exception"); + } + catch (Handover.ClosedException e) { + // expected + } + } + + @Test + public void testExceptionMarksClosedOnFull() throws Exception { + final Handover handover = new Handover(); + handover.produce(createTestRecords()); + + LinkageError error = new LinkageError(); + handover.reportError(error); + + try { + handover.produce(createTestRecords()); + fail("should throw an exception"); + } + catch (Handover.ClosedException e) { + // expected + } + } + + // ------------------------------------------------------------------------ + // test closing behavior + // ------------------------------------------------------------------------ + + @Test + public void testCloseEmptyForConsumer() throws Exception { + final Handover handover = new Handover(); + handover.close(); + + try { + handover.pollNext(); + fail("should throw an exception"); + } + catch (Handover.ClosedException e) { + // expected + } + } + + @Test + public void testCloseFullForConsumer() throws Exception { + final Handover handover = new Handover(); + handover.produce(createTestRecords()); + handover.close(); + + try { + handover.pollNext(); + fail("should throw an exception"); + } + catch (Handover.ClosedException e) { + // expected + } + } + + @Test + public void testCloseEmptyForProducer() throws Exception { + final Handover handover = new Handover(); + handover.close(); + + try { + handover.produce(createTestRecords()); + fail("should throw an exception"); + } + catch (Handover.ClosedException e) { + // expected + } + } + + @Test + public void testCloseFullForProducer() throws Exception { + final Handover handover = new Handover(); + handover.produce(createTestRecords()); + handover.close(); + + try { + handover.produce(createTestRecords()); + fail("should throw an exception"); + } + catch (Handover.ClosedException e) { + // expected + } + } + + // ------------------------------------------------------------------------ + // test wake up behavior + // ------------------------------------------------------------------------ + + @Test + public void testWakeupDoesNotWakeWhenEmpty() throws Exception { + Handover handover = new Handover(); + handover.wakeupProducer(); + + // produce into a woken but empty handover + try { + handover.produce(createTestRecords()); + } + catch (Handover.WakeupException e) { + fail(); + } + + // handover now has records, next time we wakeup and produce it needs + // to throw an exception + handover.wakeupProducer(); + try { + handover.produce(createTestRecords()); + fail("should throw an exception"); + } + catch (Handover.WakeupException e) { + // expected + } + + // empty the handover + assertNotNull(handover.pollNext()); + + // producing into an empty handover should work + try { + handover.produce(createTestRecords()); + } + catch (Handover.WakeupException e) { + fail(); + } + } + + @Test + public void testWakeupWakesOnlyOnce() throws Exception { + // create a full handover + final Handover handover = new Handover(); + handover.produce(createTestRecords()); + + handover.wakeupProducer(); + + try { + handover.produce(createTestRecords()); + fail(); + } catch (WakeupException e) { + // expected + } + + CheckedThread producer = new CheckedThread() { + @Override + public void go() throws Exception { + handover.produce(createTestRecords()); + } + }; + producer.start(); + + // the producer must go blocking + producer.waitUntilThreadHoldsLock(10000); + + // release the thread by consuming something + assertNotNull(handover.pollNext()); + producer.sync(); + } + + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + private void runProducerConsumerTest(int numRecords, int maxProducerDelay, int maxConsumerDelay) throws Exception { + // generate test data + @SuppressWarnings({"unchecked", "rawtypes"}) + final ConsumerRecords<byte[], byte[]>[] data = new ConsumerRecords[numRecords]; + for (int i = 0; i < numRecords; i++) { + data[i] = createTestRecords(); + } + + final Handover handover = new Handover(); + + ProducerThread producer = new ProducerThread(handover, data, maxProducerDelay); + ConsumerThread consumer = new ConsumerThread(handover, data, maxConsumerDelay); + + consumer.start(); + producer.start(); + + // sync first on the consumer, so it propagates assertion errors + consumer.sync(); + producer.sync(); + } + + @SuppressWarnings("unchecked") + private static ConsumerRecords<byte[], byte[]> createTestRecords() { + return mock(ConsumerRecords.class); + } + + // ------------------------------------------------------------------------ + + private static abstract class CheckedThread extends Thread { + + private volatile Throwable error; + + public abstract void go() throws Exception; + + @Override + public void run() { + try { + go(); + } + catch (Throwable t) { + error = t; + } + } + + public void sync() throws Exception { + join(); + if (error != null) { + ExceptionUtils.rethrowException(error, error.getMessage()); + } + } + + public void waitUntilThreadHoldsLock(long timeoutMillis) throws InterruptedException, TimeoutException { + final long deadline = System.nanoTime() + timeoutMillis * 1_000_000; + + while (!isBlockedOrWaiting() && (System.nanoTime() < deadline)) { + Thread.sleep(1); + } + + if (!isBlockedOrWaiting()) { + throw new TimeoutException(); + } + } + + private boolean isBlockedOrWaiting() { + State state = getState(); + return state == State.BLOCKED || state == State.WAITING || state == State.TIMED_WAITING; + } + } + + private static class ProducerThread extends CheckedThread { + + private final Random rnd = new Random(); + private final Handover handover; + private final ConsumerRecords<byte[], byte[]>[] data; + private final int maxDelay; + + private ProducerThread(Handover handover, ConsumerRecords<byte[], byte[]>[] data, int maxDelay) { + this.handover = handover; + this.data = data; + this.maxDelay = maxDelay; + } + + @Override + public void go() throws Exception { + for (ConsumerRecords<byte[], byte[]> rec : data) { + handover.produce(rec); + + if (maxDelay > 0) { + int delay = rnd.nextInt(maxDelay); + Thread.sleep(delay); + } + } + } + } + + private static class ConsumerThread extends CheckedThread { + + private final Random rnd = new Random(); + private final Handover handover; + private final ConsumerRecords<byte[], byte[]>[] data; + private final int maxDelay; + + private ConsumerThread(Handover handover, ConsumerRecords<byte[], byte[]>[] data, int maxDelay) { + this.handover = handover; + this.data = data; + this.maxDelay = maxDelay; + } + + @Override + public void go() throws Exception { + for (ConsumerRecords<byte[], byte[]> rec : data) { + ConsumerRecords<byte[], byte[]> next = handover.pollNext(); + + assertEquals(rec, next); + + if (maxDelay > 0) { + int delay = rnd.nextInt(maxDelay); + Thread.sleep(delay); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..4ac1773 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties @@ -0,0 +1,32 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger +log4j.logger.org.apache.zookeeper=OFF, testlogger +log4j.logger.state.change.logger=OFF, testlogger +log4j.logger.kafka=OFF, testlogger + +log4j.logger.org.apache.directory=OFF, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml new file mode 100644 index 0000000..45b3b92 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> + <logger name="org.apache.flink.streaming" level="WARN"/> +</configuration> \ No newline at end of file
