This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 7ea55e9 [FLINK-12595][kinesis] Interrupt thread at right time to
avoid deadlock
7ea55e9 is described below
commit 7ea55e967bc450b3b744edcaea23834646e439cd
Author: Shannon Carey <[email protected]>
AuthorDate: Sat Jul 20 14:15:50 2019 -0500
[FLINK-12595][kinesis] Interrupt thread at right time to avoid deadlock
- Inside testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown,
consumerThread.interrupt() was getting absorbed inside
KinesisDataFetcher's while(running) loop, therefore
TestableKinesisDataFetcherForShardConsumerException's awaitTermination()
wasn't getting interrupted by it. This led to deadlock, with
KinesisDataFetcher waiting on the test code to send the interrupt, and
the test code waiting for KinesisDataFetcher to throw the expected
exception.
- Now, the test code waits until KinesisDataFetcher is inside
awaitTermination() before producing the interrupt, so it can be sure
that the interrupt it produces will be received/handled inside
awaitTermination().
---
.../kinesis/internals/KinesisDataFetcherTest.java | 6 +++++-
...stableKinesisDataFetcherForShardConsumerException.java | 15 ++++++++++++++-
2 files changed, 19 insertions(+), 2 deletions(-)
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 5255e61..2815193 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -846,7 +846,7 @@ public class KinesisDataFetcherTest extends TestLogger {
DummyFlinkKinesisConsumer<String> consumer = new
DummyFlinkKinesisConsumer<>(
TestUtils.getStandardProperties(), fetcher, 1, 0);
- CheckedThread consumerThread = new CheckedThread() {
+ CheckedThread consumerThread = new
CheckedThread("FlinkKinesisConsumer") {
@Override
public void go() throws Exception {
consumer.run(new TestSourceContext<>());
@@ -858,6 +858,10 @@ public class KinesisDataFetcherTest extends TestLogger {
// ShardConsumer exception (from deserializer) will result in
fetcher being shut down.
fetcher.waitUntilShutdown(20, TimeUnit.SECONDS);
+ // Ensure that KinesisDataFetcher has exited its while(running)
loop and is inside its awaitTermination()
+ // method before we interrupt its thread, so that our interrupt
doesn't get absorbed by any other mechanism.
+ fetcher.waitUntilAwaitTermination(20, TimeUnit.SECONDS);
+
// Interrupt the thread so that
KinesisDataFetcher#awaitTermination() will throw InterruptedException.
consumerThread.interrupt();
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
index c08b7af..6ae4391 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kinesis.testutils;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -32,6 +33,8 @@ import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -39,6 +42,10 @@ import java.util.concurrent.atomic.AtomicReference;
* {@link #awaitTermination()}.
*/
public class TestableKinesisDataFetcherForShardConsumerException<T> extends
TestableKinesisDataFetcher<T> {
+ public volatile boolean wasInterrupted = false;
+
+ private OneShotLatch awaitTerminationWaiter = new OneShotLatch();
+
public TestableKinesisDataFetcherForShardConsumerException(final
List<String> fakeStreams,
final SourceFunction.SourceContext<T> sourceContext,
final Properties fakeConfiguration,
@@ -54,7 +61,12 @@ public class
TestableKinesisDataFetcherForShardConsumerException<T> extends Test
subscribedStreamsToLastDiscoveredShardIdsStateUnderTest, fakeKinesis);
}
- public volatile boolean wasInterrupted = false;
+ /**
+ * Block until awaitTermination() has been called on this class.
+ */
+ public void waitUntilAwaitTermination(long timeout, TimeUnit timeUnit)
throws InterruptedException, TimeoutException {
+ awaitTerminationWaiter.await(timeout, timeUnit);
+ }
@Override
protected ExecutorService createShardConsumersThreadPool(final String
subtaskName) {
@@ -65,6 +77,7 @@ public class
TestableKinesisDataFetcherForShardConsumerException<T> extends Test
@Override
public void awaitTermination() throws InterruptedException {
+ awaitTerminationWaiter.trigger();
try {
// Force this method to only exit by thread getting
interrupted.
while (true) {