Repository: flink Updated Branches: refs/heads/master 8dac43613 -> de2605ea7
[hotfix] [kafka] Indent Kafka010FetcherTest with tabs instead of spaces Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de2605ea Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de2605ea Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de2605ea Branch: refs/heads/master Commit: de2605ea7b17fc569890a53743783b7d26c8e56b Parents: 8dac436 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Thu Feb 23 23:13:03 2017 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Thu Feb 23 23:13:03 2017 +0800 ---------------------------------------------------------------------- .../connectors/kafka/Kafka010FetcherTest.java | 688 +++++++++---------- 1 file changed, 343 insertions(+), 345 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/de2605ea/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java index 5718986..98aa28a 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java @@ -78,51 +78,51 @@ import static org.powermock.api.mockito.PowerMockito.whenNew; @PrepareForTest(KafkaConsumerThread.class) public class Kafka010FetcherTest { - @Test - public void testCommitDoesNotBlock() throws Exception { - - // test data - final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); - final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); - testCommitData.put(testPartition, 11L); - - // to synchronize when the consumer is in its blocking method - final OneShotLatch sync = new OneShotLatch(); - - // ----- the mock consumer with blocking poll calls ---- - final MultiShotLatch blockerLatch = new MultiShotLatch(); - - KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); - when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { - - @Override - public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { - sync.trigger(); - blockerLatch.await(); - return ConsumerRecords.empty(); - } - }); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) { - blockerLatch.trigger(); - return null; - } - }).when(mockConsumer).wakeup(); - - // make sure the fetcher creates the mock consumer - whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); - - // ----- create the test fetcher ----- - - @SuppressWarnings("unchecked") - SourceContext<String> sourceContext = mock(SourceContext.class); - List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); - KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); - - final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( - sourceContext, + @Test + public void testCommitDoesNotBlock() throws Exception { + + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // to synchronize when the consumer is in its blocking method + final OneShotLatch sync = new OneShotLatch(); + + // ----- the mock consumer with blocking poll calls ---- + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { + sync.trigger(); + blockerLatch.await(); + return ConsumerRecords.empty(); + } + }); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) { + blockerLatch.trigger(); + return null; + } + }).when(mockConsumer).wakeup(); + + // make sure the fetcher creates the mock consumer + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext<String> sourceContext = mock(SourceContext.class); + List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + + final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( + sourceContext, topics, null, /* no restored state */ null, /* periodic assigner */ @@ -139,128 +139,128 @@ public class Kafka010FetcherTest { StartupMode.GROUP_OFFSETS, false); - // ----- run the fetcher ----- - - final AtomicReference<Throwable> error = new AtomicReference<>(); - final Thread fetcherRunner = new Thread("fetcher runner") { - - @Override - public void run() { - try { - fetcher.runFetchLoop(); - } catch (Throwable t) { - error.set(t); - } - } - }; - fetcherRunner.start(); - - // wait until the fetcher has reached the method of interest - sync.await(); - - // ----- trigger the offset commit ----- - - final AtomicReference<Throwable> commitError = new AtomicReference<>(); - final Thread committer = new Thread("committer runner") { - @Override - public void run() { - try { - fetcher.commitInternalOffsetsToKafka(testCommitData); - } catch (Throwable t) { - commitError.set(t); - } - } - }; - committer.start(); - - // ----- ensure that the committer finishes in time ----- - committer.join(30000); - assertFalse("The committer did not finish in time", committer.isAlive()); - - // ----- test done, wait till the fetcher is done for a clean shutdown ----- - fetcher.cancel(); - fetcherRunner.join(); - - // check that there were no errors in the fetcher - final Throwable fetcherError = error.get(); - if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) { - throw new Exception("Exception in the fetcher", fetcherError); - } - final Throwable committerError = commitError.get(); - if (committerError != null) { - throw new Exception("Exception in the committer", committerError); - } - } - - @Test - public void ensureOffsetsGetCommitted() throws Exception { - - // test data - final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42); - final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99); - - final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>(); - testCommitData1.put(testPartition1, 11L); - testCommitData1.put(testPartition2, 18L); - - final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>(); - testCommitData2.put(testPartition1, 19L); - testCommitData2.put(testPartition2, 28L); - - final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>(); - - - // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ---- - - final MultiShotLatch blockerLatch = new MultiShotLatch(); - - KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); - - when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { - @Override - public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { - blockerLatch.await(); - return ConsumerRecords.empty(); - } - }); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) { - blockerLatch.trigger(); - return null; - } - }).when(mockConsumer).wakeup(); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) { - @SuppressWarnings("unchecked") - Map<TopicPartition, OffsetAndMetadata> offsets = - (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0]; - - OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1]; - - commitStore.add(offsets); - callback.onComplete(offsets, null); - - return null; - } - }).when(mockConsumer).commitAsync( - Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class)); - - // make sure the fetcher creates the mock consumer - whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); - - // ----- create the test fetcher ----- - - @SuppressWarnings("unchecked") - SourceContext<String> sourceContext = mock(SourceContext.class); - List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); - KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); - - final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( - sourceContext, + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + final Thread fetcherRunner = new Thread("fetcher runner") { + + @Override + public void run() { + try { + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }; + fetcherRunner.start(); + + // wait until the fetcher has reached the method of interest + sync.await(); + + // ----- trigger the offset commit ----- + + final AtomicReference<Throwable> commitError = new AtomicReference<>(); + final Thread committer = new Thread("committer runner") { + @Override + public void run() { + try { + fetcher.commitInternalOffsetsToKafka(testCommitData); + } catch (Throwable t) { + commitError.set(t); + } + } + }; + committer.start(); + + // ----- ensure that the committer finishes in time ----- + committer.join(30000); + assertFalse("The committer did not finish in time", committer.isAlive()); + + // ----- test done, wait till the fetcher is done for a clean shutdown ----- + fetcher.cancel(); + fetcherRunner.join(); + + // check that there were no errors in the fetcher + final Throwable fetcherError = error.get(); + if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) { + throw new Exception("Exception in the fetcher", fetcherError); + } + + final Throwable committerError = commitError.get(); + if (committerError != null) { + throw new Exception("Exception in the committer", committerError); + } + } + + @Test + public void ensureOffsetsGetCommitted() throws Exception { + + // test data + final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42); + final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99); + + final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>(); + testCommitData1.put(testPartition1, 11L); + testCommitData1.put(testPartition2, 18L); + + final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>(); + testCommitData2.put(testPartition1, 19L); + testCommitData2.put(testPartition2, 28L); + + final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>(); + + // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ---- + + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { + blockerLatch.await(); + return ConsumerRecords.empty(); + } + }); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) { + blockerLatch.trigger(); + return null; + } + }).when(mockConsumer).wakeup(); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) { + @SuppressWarnings("unchecked") + Map<TopicPartition, OffsetAndMetadata> offsets = + (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0]; + + OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1]; + + commitStore.add(offsets); + callback.onComplete(offsets, null); + + return null; + } + }).when(mockConsumer).commitAsync( + Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class)); + + // make sure the fetcher creates the mock consumer + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext<String> sourceContext = mock(SourceContext.class); + List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + + final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( + sourceContext, topics, null, /* no restored state */ null, /* periodic assigner */ @@ -277,106 +277,105 @@ public class Kafka010FetcherTest { StartupMode.GROUP_OFFSETS, false); + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + final Thread fetcherRunner = new Thread("fetcher runner") { + + @Override + public void run() { + try { + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }; + fetcherRunner.start(); + + // ----- trigger the first offset commit ----- + + fetcher.commitInternalOffsetsToKafka(testCommitData1); + Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take(); + + for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) { + TopicPartition partition = entry.getKey(); + if (partition.topic().equals("test")) { + assertEquals(42, partition.partition()); + assertEquals(12L, entry.getValue().offset()); + } + else if (partition.topic().equals("another")) { + assertEquals(99, partition.partition()); + assertEquals(18L, entry.getValue().offset()); + } + } + + // ----- trigger the second offset commit ----- + + fetcher.commitInternalOffsetsToKafka(testCommitData2); + Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take(); + + for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) { + TopicPartition partition = entry.getKey(); + if (partition.topic().equals("test")) { + assertEquals(42, partition.partition()); + assertEquals(20L, entry.getValue().offset()); + } + else if (partition.topic().equals("another")) { + assertEquals(99, partition.partition()); + assertEquals(28L, entry.getValue().offset()); + } + } + + // ----- test done, wait till the fetcher is done for a clean shutdown ----- + fetcher.cancel(); + fetcherRunner.join(); + + // check that there were no errors in the fetcher + final Throwable caughtError = error.get(); + if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) { + throw new Exception("Exception in the fetcher", caughtError); + } + } + + @Test + public void testCancellationWhenEmitBlocks() throws Exception { + + // ----- some test data ----- + + final String topic = "test-topic"; + final int partition = 3; + final byte[] payload = new byte[] {1, 2, 3, 4}; + + final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList( + new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload), + new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload), + new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload)); + + final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>(); + data.put(new TopicPartition(topic, partition), records); + + final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data); - // ----- run the fetcher ----- - - final AtomicReference<Throwable> error = new AtomicReference<>(); - final Thread fetcherRunner = new Thread("fetcher runner") { - - @Override - public void run() { - try { - fetcher.runFetchLoop(); - } catch (Throwable t) { - error.set(t); - } - } - }; - fetcherRunner.start(); - - // ----- trigger the first offset commit ----- - - fetcher.commitInternalOffsetsToKafka(testCommitData1); - Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take(); - - for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) { - TopicPartition partition = entry.getKey(); - if (partition.topic().equals("test")) { - assertEquals(42, partition.partition()); - assertEquals(12L, entry.getValue().offset()); - } - else if (partition.topic().equals("another")) { - assertEquals(99, partition.partition()); - assertEquals(18L, entry.getValue().offset()); - } - } - - // ----- trigger the second offset commit ----- - - fetcher.commitInternalOffsetsToKafka(testCommitData2); - Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take(); - - for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) { - TopicPartition partition = entry.getKey(); - if (partition.topic().equals("test")) { - assertEquals(42, partition.partition()); - assertEquals(20L, entry.getValue().offset()); - } - else if (partition.topic().equals("another")) { - assertEquals(99, partition.partition()); - assertEquals(28L, entry.getValue().offset()); - } - } - - // ----- test done, wait till the fetcher is done for a clean shutdown ----- - fetcher.cancel(); - fetcherRunner.join(); - - // check that there were no errors in the fetcher - final Throwable caughtError = error.get(); - if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) { - throw new Exception("Exception in the fetcher", caughtError); - } - } - - @Test - public void testCancellationWhenEmitBlocks() throws Exception { - - // ----- some test data ----- - - final String topic = "test-topic"; - final int partition = 3; - final byte[] payload = new byte[] {1, 2, 3, 4}; - - final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList( - new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload), - new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload), - new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload)); - - final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>(); - data.put(new TopicPartition(topic, partition), records); - - final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data); - - // ----- the test consumer ----- - - final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); - when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { - @Override - public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) { - return consumerRecords; - } - }); - - whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); - - // ----- build a fetcher ----- - - BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>(); - List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition)); - KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); - - final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( + // ----- the test consumer ----- + + final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) { + return consumerRecords; + } + }); + + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- build a fetcher ----- + + BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>(); + List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition)); + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + + final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( sourceContext, topics, null, /* no restored state */ @@ -394,58 +393,57 @@ public class Kafka010FetcherTest { StartupMode.GROUP_OFFSETS, false); + // ----- run the fetcher ----- - // ----- run the fetcher ----- - - final AtomicReference<Throwable> error = new AtomicReference<>(); - final Thread fetcherRunner = new Thread("fetcher runner") { + final AtomicReference<Throwable> error = new AtomicReference<>(); + final Thread fetcherRunner = new Thread("fetcher runner") { - @Override - public void run() { - try { - fetcher.runFetchLoop(); - } catch (Throwable t) { - error.set(t); - } - } - }; - fetcherRunner.start(); + @Override + public void run() { + try { + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }; + fetcherRunner.start(); - // wait until the thread started to emit records to the source context - sourceContext.waitTillHasBlocker(); + // wait until the thread started to emit records to the source context + sourceContext.waitTillHasBlocker(); - // now we try to cancel the fetcher, including the interruption usually done on the task thread - // once it has finished, there must be no more thread blocked on the source context - fetcher.cancel(); - fetcherRunner.interrupt(); - fetcherRunner.join(); + // now we try to cancel the fetcher, including the interruption usually done on the task thread + // once it has finished, there must be no more thread blocked on the source context + fetcher.cancel(); + fetcherRunner.interrupt(); + fetcherRunner.join(); - assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking()); - } + assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking()); + } - // ------------------------------------------------------------------------ - // test utilities - // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + // test utilities + // ------------------------------------------------------------------------ - private static final class BlockingSourceContext<T> implements SourceContext<T> { + private static final class BlockingSourceContext<T> implements SourceContext<T> { - private final ReentrantLock lock = new ReentrantLock(); - private final OneShotLatch inBlocking = new OneShotLatch(); + private final ReentrantLock lock = new ReentrantLock(); + private final OneShotLatch inBlocking = new OneShotLatch(); - @Override - public void collect(T element) { - block(); - } + @Override + public void collect(T element) { + block(); + } - @Override - public void collectWithTimestamp(T element, long timestamp) { - block(); - } + @Override + public void collectWithTimestamp(T element, long timestamp) { + block(); + } - @Override - public void emitWatermark(Watermark mark) { - block(); - } + @Override + public void emitWatermark(Watermark mark) { + block(); + } @Override public void markAsTemporarilyIdle() { @@ -453,42 +451,42 @@ public class Kafka010FetcherTest { } @Override - public Object getCheckpointLock() { - return new Object(); - } - - @Override - public void close() {} - - public void waitTillHasBlocker() throws InterruptedException { - inBlocking.await(); - } - - public boolean isStillBlocking() { - return lock.isLocked(); - } - - @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"}) - private void block() { - lock.lock(); - try { - inBlocking.trigger(); - - // put this thread to sleep indefinitely - final Object o = new Object(); - while (true) { - synchronized (o) { - o.wait(); - } - } - } - catch (InterruptedException e) { - // exit cleanly, simply reset the interruption flag - Thread.currentThread().interrupt(); - } - finally { - lock.unlock(); - } - } - } + public Object getCheckpointLock() { + return new Object(); + } + + @Override + public void close() {} + + public void waitTillHasBlocker() throws InterruptedException { + inBlocking.await(); + } + + public boolean isStillBlocking() { + return lock.isLocked(); + } + + @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"}) + private void block() { + lock.lock(); + try { + inBlocking.trigger(); + + // put this thread to sleep indefinitely + final Object o = new Object(); + while (true) { + synchronized (o) { + o.wait(); + } + } + } + catch (InterruptedException e) { + // exit cleanly, simply reset the interruption flag + Thread.currentThread().interrupt(); + } + finally { + lock.unlock(); + } + } + } }
