Repository: flink
Updated Branches:
  refs/heads/master 8dac43613 -> de2605ea7


[hotfix] [kafka] Indent Kafka010FetcherTest with tabs instead of spaces


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de2605ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de2605ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de2605ea

Branch: refs/heads/master
Commit: de2605ea7b17fc569890a53743783b7d26c8e56b
Parents: 8dac436
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Thu Feb 23 23:13:03 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Thu Feb 23 23:13:03 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/Kafka010FetcherTest.java   | 688 +++++++++----------
 1 file changed, 343 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/de2605ea/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 5718986..98aa28a 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -78,51 +78,51 @@ import static 
org.powermock.api.mockito.PowerMockito.whenNew;
 @PrepareForTest(KafkaConsumerThread.class)
 public class Kafka010FetcherTest {
 
-    @Test
-    public void testCommitDoesNotBlock() throws Exception {
-
-        // test data
-        final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
-        final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
-        testCommitData.put(testPartition, 11L);
-
-        // to synchronize when the consumer is in its blocking method
-        final OneShotLatch sync = new OneShotLatch();
-
-        // ----- the mock consumer with blocking poll calls ----
-        final MultiShotLatch blockerLatch = new MultiShotLatch();
-
-        KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-        when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
-
-            @Override
-            public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) 
throws InterruptedException {
-                sync.trigger();
-                blockerLatch.await();
-                return ConsumerRecords.empty();
-            }
-        });
-
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocation) {
-                blockerLatch.trigger();
-                return null;
-            }
-        }).when(mockConsumer).wakeup();
-
-        // make sure the fetcher creates the mock consumer
-        
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-        // ----- create the test fetcher -----
-
-        @SuppressWarnings("unchecked")
-        SourceContext<String> sourceContext = mock(SourceContext.class);
-        List<KafkaTopicPartition> topics = Collections.singletonList(new 
KafkaTopicPartition("test", 42));
-        KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
-        final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
-                       sourceContext,
+       @Test
+       public void testCommitDoesNotBlock() throws Exception {
+
+               // test data
+               final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
+               final Map<KafkaTopicPartition, Long> testCommitData = new 
HashMap<>();
+               testCommitData.put(testPartition, 11L);
+
+               // to synchronize when the consumer is in its blocking method
+               final OneShotLatch sync = new OneShotLatch();
+
+               // ----- the mock consumer with blocking poll calls ----
+               final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+               KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+               when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
+
+                       @Override
+                       public ConsumerRecords<?, ?> answer(InvocationOnMock 
invocation) throws InterruptedException {
+                               sync.trigger();
+                               blockerLatch.await();
+                               return ConsumerRecords.empty();
+                       }
+               });
+
+               doAnswer(new Answer<Void>() {
+                       @Override
+                       public Void answer(InvocationOnMock invocation) {
+                               blockerLatch.trigger();
+                               return null;
+                       }
+               }).when(mockConsumer).wakeup();
+
+               // make sure the fetcher creates the mock consumer
+               
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+               // ----- create the test fetcher -----
+
+               @SuppressWarnings("unchecked")
+               SourceContext<String> sourceContext = mock(SourceContext.class);
+               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition("test", 42));
+               KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+               final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+                               sourceContext,
                                topics,
                                null, /* no restored state */
                                null, /* periodic assigner */
@@ -139,128 +139,128 @@ public class Kafka010FetcherTest {
                                StartupMode.GROUP_OFFSETS,
                                false);
 
-        // ----- run the fetcher -----
-
-        final AtomicReference<Throwable> error = new AtomicReference<>();
-        final Thread fetcherRunner = new Thread("fetcher runner") {
-
-            @Override
-            public void run() {
-                try {
-                    fetcher.runFetchLoop();
-                } catch (Throwable t) {
-                    error.set(t);
-                }
-            }
-        };
-        fetcherRunner.start();
-
-        // wait until the fetcher has reached the method of interest
-        sync.await();
-
-        // ----- trigger the offset commit -----
-
-        final AtomicReference<Throwable> commitError = new AtomicReference<>();
-        final Thread committer = new Thread("committer runner") {
-            @Override
-            public void run() {
-                try {
-                    fetcher.commitInternalOffsetsToKafka(testCommitData);
-                } catch (Throwable t) {
-                    commitError.set(t);
-                }
-            }
-        };
-        committer.start();
-
-        // ----- ensure that the committer finishes in time  -----
-        committer.join(30000);
-        assertFalse("The committer did not finish in time", 
committer.isAlive());
-
-        // ----- test done, wait till the fetcher is done for a clean shutdown 
-----
-        fetcher.cancel();
-        fetcherRunner.join();
-
-        // check that there were no errors in the fetcher
-        final Throwable fetcherError = error.get();
-        if (fetcherError != null && !(fetcherError instanceof 
Handover.ClosedException)) {
-            throw new Exception("Exception in the fetcher", fetcherError);
-        }
-        final Throwable committerError = commitError.get();
-        if (committerError != null) {
-            throw new Exception("Exception in the committer", committerError);
-        }
-    }
-
-    @Test
-    public void ensureOffsetsGetCommitted() throws Exception {
-
-        // test data
-        final KafkaTopicPartition testPartition1 = new 
KafkaTopicPartition("test", 42);
-        final KafkaTopicPartition testPartition2 = new 
KafkaTopicPartition("another", 99);
-
-        final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
-        testCommitData1.put(testPartition1, 11L);
-        testCommitData1.put(testPartition2, 18L);
-
-        final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
-        testCommitData2.put(testPartition1, 19L);
-        testCommitData2.put(testPartition2, 28L);
-
-        final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> 
commitStore = new LinkedBlockingQueue<>();
-
-
-        // ----- the mock consumer with poll(), wakeup(), and commit(A)sync 
calls ----
-
-        final MultiShotLatch blockerLatch = new MultiShotLatch();
-
-        KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-
-        when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
-            @Override
-            public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) 
throws InterruptedException {
-                blockerLatch.await();
-                return ConsumerRecords.empty();
-            }
-        });
-
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocation) {
-                blockerLatch.trigger();
-                return null;
-            }
-        }).when(mockConsumer).wakeup();
-
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocation) {
-                @SuppressWarnings("unchecked")
-                Map<TopicPartition, OffsetAndMetadata> offsets =
-                        (Map<TopicPartition, OffsetAndMetadata>) 
invocation.getArguments()[0];
-
-                OffsetCommitCallback callback = (OffsetCommitCallback) 
invocation.getArguments()[1];
-
-                commitStore.add(offsets);
-                callback.onComplete(offsets, null);
-
-                return null;
-            }
-        }).when(mockConsumer).commitAsync(
-                Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), 
any(OffsetCommitCallback.class));
-
-        // make sure the fetcher creates the mock consumer
-        
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-        // ----- create the test fetcher -----
-
-        @SuppressWarnings("unchecked")
-        SourceContext<String> sourceContext = mock(SourceContext.class);
-        List<KafkaTopicPartition> topics = Collections.singletonList(new 
KafkaTopicPartition("test", 42));
-        KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
-        final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
-                       sourceContext,
+               // ----- run the fetcher -----
+
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+               final Thread fetcherRunner = new Thread("fetcher runner") {
+
+                       @Override
+                       public void run() {
+                               try {
+                                       fetcher.runFetchLoop();
+                               } catch (Throwable t) {
+                                       error.set(t);
+                               }
+                       }
+               };
+               fetcherRunner.start();
+
+               // wait until the fetcher has reached the method of interest
+               sync.await();
+
+               // ----- trigger the offset commit -----
+
+               final AtomicReference<Throwable> commitError = new 
AtomicReference<>();
+               final Thread committer = new Thread("committer runner") {
+                       @Override
+                       public void run() {
+                               try {
+                                       
fetcher.commitInternalOffsetsToKafka(testCommitData);
+                               } catch (Throwable t) {
+                                       commitError.set(t);
+                               }
+                       }
+               };
+               committer.start();
+
+               // ----- ensure that the committer finishes in time  -----
+               committer.join(30000);
+               assertFalse("The committer did not finish in time", 
committer.isAlive());
+
+               // ----- test done, wait till the fetcher is done for a clean 
shutdown -----
+               fetcher.cancel();
+               fetcherRunner.join();
+
+               // check that there were no errors in the fetcher
+               final Throwable fetcherError = error.get();
+               if (fetcherError != null && !(fetcherError instanceof 
Handover.ClosedException)) {
+                       throw new Exception("Exception in the fetcher", 
fetcherError);
+               }
+
+               final Throwable committerError = commitError.get();
+               if (committerError != null) {
+                       throw new Exception("Exception in the committer", 
committerError);
+               }
+       }
+
+       @Test
+       public void ensureOffsetsGetCommitted() throws Exception {
+
+               // test data
+               final KafkaTopicPartition testPartition1 = new 
KafkaTopicPartition("test", 42);
+               final KafkaTopicPartition testPartition2 = new 
KafkaTopicPartition("another", 99);
+
+               final Map<KafkaTopicPartition, Long> testCommitData1 = new 
HashMap<>();
+               testCommitData1.put(testPartition1, 11L);
+               testCommitData1.put(testPartition2, 18L);
+
+               final Map<KafkaTopicPartition, Long> testCommitData2 = new 
HashMap<>();
+               testCommitData2.put(testPartition1, 19L);
+               testCommitData2.put(testPartition2, 28L);
+
+               final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> 
commitStore = new LinkedBlockingQueue<>();
+
+               // ----- the mock consumer with poll(), wakeup(), and 
commit(A)sync calls ----
+
+               final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+               KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+
+               when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
+                       @Override
+                       public ConsumerRecords<?, ?> answer(InvocationOnMock 
invocation) throws InterruptedException {
+                               blockerLatch.await();
+                               return ConsumerRecords.empty();
+                       }
+               });
+
+               doAnswer(new Answer<Void>() {
+                       @Override
+                       public Void answer(InvocationOnMock invocation) {
+                               blockerLatch.trigger();
+                               return null;
+                       }
+               }).when(mockConsumer).wakeup();
+
+               doAnswer(new Answer<Void>() {
+                       @Override
+                       public Void answer(InvocationOnMock invocation) {
+                               @SuppressWarnings("unchecked")
+                               Map<TopicPartition, OffsetAndMetadata> offsets =
+                                               (Map<TopicPartition, 
OffsetAndMetadata>) invocation.getArguments()[0];
+
+                               OffsetCommitCallback callback = 
(OffsetCommitCallback) invocation.getArguments()[1];
+
+                               commitStore.add(offsets);
+                               callback.onComplete(offsets, null);
+
+                               return null;
+                       }
+               }).when(mockConsumer).commitAsync(
+                               Mockito.<Map<TopicPartition, 
OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
+
+               // make sure the fetcher creates the mock consumer
+               
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+               // ----- create the test fetcher -----
+
+               @SuppressWarnings("unchecked")
+               SourceContext<String> sourceContext = mock(SourceContext.class);
+               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition("test", 42));
+               KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+               final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+                               sourceContext,
                                topics,
                                null, /* no restored state */
                                null, /* periodic assigner */
@@ -277,106 +277,105 @@ public class Kafka010FetcherTest {
                                StartupMode.GROUP_OFFSETS,
                                false);
 
+               // ----- run the fetcher -----
+
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+               final Thread fetcherRunner = new Thread("fetcher runner") {
+
+                       @Override
+                       public void run() {
+                               try {
+                                       fetcher.runFetchLoop();
+                               } catch (Throwable t) {
+                                       error.set(t);
+                               }
+                       }
+               };
+               fetcherRunner.start();
+
+               // ----- trigger the first offset commit -----
+
+               fetcher.commitInternalOffsetsToKafka(testCommitData1);
+               Map<TopicPartition, OffsetAndMetadata> result1 = 
commitStore.take();
+
+               for (Entry<TopicPartition, OffsetAndMetadata> entry : 
result1.entrySet()) {
+                       TopicPartition partition = entry.getKey();
+                       if (partition.topic().equals("test")) {
+                               assertEquals(42, partition.partition());
+                               assertEquals(12L, entry.getValue().offset());
+                       }
+                       else if (partition.topic().equals("another")) {
+                               assertEquals(99, partition.partition());
+                               assertEquals(18L, entry.getValue().offset());
+                       }
+               }
+
+               // ----- trigger the second offset commit -----
+
+               fetcher.commitInternalOffsetsToKafka(testCommitData2);
+               Map<TopicPartition, OffsetAndMetadata> result2 = 
commitStore.take();
+
+               for (Entry<TopicPartition, OffsetAndMetadata> entry : 
result2.entrySet()) {
+                       TopicPartition partition = entry.getKey();
+                       if (partition.topic().equals("test")) {
+                               assertEquals(42, partition.partition());
+                               assertEquals(20L, entry.getValue().offset());
+                       }
+                       else if (partition.topic().equals("another")) {
+                               assertEquals(99, partition.partition());
+                               assertEquals(28L, entry.getValue().offset());
+                       }
+               }
+
+               // ----- test done, wait till the fetcher is done for a clean 
shutdown -----
+               fetcher.cancel();
+               fetcherRunner.join();
+
+               // check that there were no errors in the fetcher
+               final Throwable caughtError = error.get();
+               if (caughtError != null && !(caughtError instanceof 
Handover.ClosedException)) {
+                       throw new Exception("Exception in the fetcher", 
caughtError);
+               }
+       }
+
+       @Test
+       public void testCancellationWhenEmitBlocks() throws Exception {
+
+               // ----- some test data -----
+
+               final String topic = "test-topic";
+               final int partition = 3;
+               final byte[] payload = new byte[] {1, 2, 3, 4};
+
+               final List<ConsumerRecord<byte[], byte[]>> records = 
Arrays.asList(
+                               new ConsumerRecord<byte[], byte[]>(topic, 
partition, 15, payload, payload),
+                               new ConsumerRecord<byte[], byte[]>(topic, 
partition, 16, payload, payload),
+                               new ConsumerRecord<byte[], byte[]>(topic, 
partition, 17, payload, payload));
+
+               final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
data = new HashMap<>();
+               data.put(new TopicPartition(topic, partition), records);
+
+               final ConsumerRecords<byte[], byte[]> consumerRecords = new 
ConsumerRecords<>(data);
 
-        // ----- run the fetcher -----
-
-        final AtomicReference<Throwable> error = new AtomicReference<>();
-        final Thread fetcherRunner = new Thread("fetcher runner") {
-
-            @Override
-            public void run() {
-                try {
-                    fetcher.runFetchLoop();
-                } catch (Throwable t) {
-                    error.set(t);
-                }
-            }
-        };
-        fetcherRunner.start();
-
-        // ----- trigger the first offset commit -----
-
-        fetcher.commitInternalOffsetsToKafka(testCommitData1);
-        Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
-
-        for (Entry<TopicPartition, OffsetAndMetadata> entry : 
result1.entrySet()) {
-            TopicPartition partition = entry.getKey();
-            if (partition.topic().equals("test")) {
-                assertEquals(42, partition.partition());
-                assertEquals(12L, entry.getValue().offset());
-            }
-            else if (partition.topic().equals("another")) {
-                assertEquals(99, partition.partition());
-                assertEquals(18L, entry.getValue().offset());
-            }
-        }
-
-        // ----- trigger the second offset commit -----
-
-        fetcher.commitInternalOffsetsToKafka(testCommitData2);
-        Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
-
-        for (Entry<TopicPartition, OffsetAndMetadata> entry : 
result2.entrySet()) {
-            TopicPartition partition = entry.getKey();
-            if (partition.topic().equals("test")) {
-                assertEquals(42, partition.partition());
-                assertEquals(20L, entry.getValue().offset());
-            }
-            else if (partition.topic().equals("another")) {
-                assertEquals(99, partition.partition());
-                assertEquals(28L, entry.getValue().offset());
-            }
-        }
-
-        // ----- test done, wait till the fetcher is done for a clean shutdown 
-----
-        fetcher.cancel();
-        fetcherRunner.join();
-
-        // check that there were no errors in the fetcher
-        final Throwable caughtError = error.get();
-        if (caughtError != null && !(caughtError instanceof 
Handover.ClosedException)) {
-            throw new Exception("Exception in the fetcher", caughtError);
-        }
-    }
-
-    @Test
-    public void testCancellationWhenEmitBlocks() throws Exception {
-
-        // ----- some test data -----
-
-        final String topic = "test-topic";
-        final int partition = 3;
-        final byte[] payload = new byte[] {1, 2, 3, 4};
-
-        final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
-                new ConsumerRecord<byte[], byte[]>(topic, partition, 15, 
payload, payload),
-                new ConsumerRecord<byte[], byte[]>(topic, partition, 16, 
payload, payload),
-                new ConsumerRecord<byte[], byte[]>(topic, partition, 17, 
payload, payload));
-
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = 
new HashMap<>();
-        data.put(new TopicPartition(topic, partition), records);
-
-        final ConsumerRecords<byte[], byte[]> consumerRecords = new 
ConsumerRecords<>(data);
-
-        // ----- the test consumer -----
-
-        final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-        when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
-            @Override
-            public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
-                return consumerRecords;
-            }
-        });
-
-        
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-        // ----- build a fetcher -----
-
-        BlockingSourceContext<String> sourceContext = new 
BlockingSourceContext<>();
-        List<KafkaTopicPartition> topics = Collections.singletonList(new 
KafkaTopicPartition(topic, partition));
-        KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
-        final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+               // ----- the test consumer -----
+
+               final KafkaConsumer<?, ?> mockConsumer = 
mock(KafkaConsumer.class);
+               when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
+                       @Override
+                       public ConsumerRecords<?, ?> answer(InvocationOnMock 
invocation) {
+                               return consumerRecords;
+                       }
+               });
+
+               
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+               // ----- build a fetcher -----
+
+               BlockingSourceContext<String> sourceContext = new 
BlockingSourceContext<>();
+               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition(topic, partition));
+               KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+               final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
                                sourceContext,
                                topics,
                                null, /* no restored state */
@@ -394,58 +393,57 @@ public class Kafka010FetcherTest {
                                StartupMode.GROUP_OFFSETS,
                                false);
 
+               // ----- run the fetcher -----
 
-        // ----- run the fetcher -----
-
-        final AtomicReference<Throwable> error = new AtomicReference<>();
-        final Thread fetcherRunner = new Thread("fetcher runner") {
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+               final Thread fetcherRunner = new Thread("fetcher runner") {
 
-            @Override
-            public void run() {
-                try {
-                    fetcher.runFetchLoop();
-                } catch (Throwable t) {
-                    error.set(t);
-                }
-            }
-        };
-        fetcherRunner.start();
+                       @Override
+                       public void run() {
+                               try {
+                                       fetcher.runFetchLoop();
+                               } catch (Throwable t) {
+                                       error.set(t);
+                               }
+                       }
+               };
+               fetcherRunner.start();
 
-        // wait until the thread started to emit records to the source context
-        sourceContext.waitTillHasBlocker();
+               // wait until the thread started to emit records to the source 
context
+               sourceContext.waitTillHasBlocker();
 
-        // now we try to cancel the fetcher, including the interruption 
usually done on the task thread
-        // once it has finished, there must be no more thread blocked on the 
source context
-        fetcher.cancel();
-        fetcherRunner.interrupt();
-        fetcherRunner.join();
+               // now we try to cancel the fetcher, including the interruption 
usually done on the task thread
+               // once it has finished, there must be no more thread blocked 
on the source context
+               fetcher.cancel();
+               fetcherRunner.interrupt();
+               fetcherRunner.join();
 
-        assertFalse("fetcher threads did not properly finish", 
sourceContext.isStillBlocking());
-    }
+               assertFalse("fetcher threads did not properly finish", 
sourceContext.isStillBlocking());
+       }
 
-    // ------------------------------------------------------------------------
-    //  test utilities
-    // ------------------------------------------------------------------------
+       // 
------------------------------------------------------------------------
+       //  test utilities
+       // 
------------------------------------------------------------------------
 
-    private static final class BlockingSourceContext<T> implements 
SourceContext<T> {
+       private static final class BlockingSourceContext<T> implements 
SourceContext<T> {
 
-        private final ReentrantLock lock = new ReentrantLock();
-        private final OneShotLatch inBlocking = new OneShotLatch();
+               private final ReentrantLock lock = new ReentrantLock();
+               private final OneShotLatch inBlocking = new OneShotLatch();
 
-        @Override
-        public void collect(T element) {
-            block();
-        }
+               @Override
+               public void collect(T element) {
+                       block();
+               }
 
-        @Override
-        public void collectWithTimestamp(T element, long timestamp) {
-            block();
-        }
+               @Override
+               public void collectWithTimestamp(T element, long timestamp) {
+                       block();
+               }
 
-        @Override
-        public void emitWatermark(Watermark mark) {
-            block();
-        }
+               @Override
+               public void emitWatermark(Watermark mark) {
+                       block();
+               }
 
                @Override
                public void markAsTemporarilyIdle() {
@@ -453,42 +451,42 @@ public class Kafka010FetcherTest {
                }
 
                @Override
-        public Object getCheckpointLock() {
-            return new Object();
-        }
-
-        @Override
-        public void close() {}
-
-        public void waitTillHasBlocker() throws InterruptedException {
-            inBlocking.await();
-        }
-
-        public boolean isStillBlocking() {
-            return lock.isLocked();
-        }
-
-        @SuppressWarnings({"InfiniteLoopStatement", 
"SynchronizationOnLocalVariableOrMethodParameter"})
-        private void block() {
-            lock.lock();
-            try {
-                inBlocking.trigger();
-
-                // put this thread to sleep indefinitely
-                final Object o = new Object();
-                while (true) {
-                    synchronized (o) {
-                        o.wait();
-                    }
-                }
-            }
-            catch (InterruptedException e) {
-                // exit cleanly, simply reset the interruption flag
-                Thread.currentThread().interrupt();
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-    }
+               public Object getCheckpointLock() {
+                       return new Object();
+               }
+
+               @Override
+               public void close() {}
+
+               public void waitTillHasBlocker() throws InterruptedException {
+                       inBlocking.await();
+               }
+
+               public boolean isStillBlocking() {
+                       return lock.isLocked();
+               }
+
+               @SuppressWarnings({"InfiniteLoopStatement", 
"SynchronizationOnLocalVariableOrMethodParameter"})
+               private void block() {
+                       lock.lock();
+                       try {
+                               inBlocking.trigger();
+
+                               // put this thread to sleep indefinitely
+                               final Object o = new Object();
+                               while (true) {
+                                       synchronized (o) {
+                                               o.wait();
+                                       }
+                               }
+                       }
+                       catch (InterruptedException e) {
+                               // exit cleanly, simply reset the interruption 
flag
+                               Thread.currentThread().interrupt();
+                       }
+                       finally {
+                               lock.unlock();
+                       }
+               }
+       }
 }

Reply via email to