This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 84e2d57  [FLINK-22479[Kinesis][Consumer] Potential lock-up under error 
condition
84e2d57 is described below

commit 84e2d57bf6e0b7e35ccc60a176363fde3793ff3e
Author: Danny Cranmer <[email protected]>
AuthorDate: Mon Apr 26 15:22:19 2021 +0100

    [FLINK-22479[Kinesis][Consumer] Potential lock-up under error condition
---
 .../kinesis/config/ConsumerConfigConstants.java    |   6 +
 .../kinesis/internals/KinesisDataFetcher.java      |  65 +++++++---
 .../publisher/fanout/FanOutRecordPublisher.java    |   5 +-
 .../fanout/FanOutRecordPublisherConfiguration.java |  15 +++
 .../publisher/fanout/FanOutShardSubscriber.java    | 144 ++++++++++++++++-----
 .../kinesis/internals/KinesisDataFetcherTest.java  |  76 +++++++++--
 .../FanOutRecordPublisherConfigurationTest.java    |  50 +++++--
 .../fanout/FanOutShardSubscriberTest.java          |  72 ++++++++++-
 .../FakeKinesisFanOutBehavioursFactory.java        |  22 ++++
 .../testutils/TestableKinesisDataFetcher.java      |  18 ++-
 10 files changed, 383 insertions(+), 90 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 3140e6e..4ea0bd8 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -217,6 +217,10 @@ public class ConsumerConfigConstants extends 
AWSConfigConstants {
     public static final String SUBSCRIBE_TO_SHARD_RETRIES =
             "flink.shard.subscribetoshard.maxretries";
 
+    /** A timeout when waiting for a shard subscription to be established. */
+    public static final String SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS =
+            "flink.shard.subscribetoshard.timeout";
+
     /** The base backoff time between each subscribeToShard attempt. */
     public static final String SUBSCRIBE_TO_SHARD_BACKOFF_BASE =
             "flink.shard.subscribetoshard.backoff.base";
@@ -363,6 +367,8 @@ public class ConsumerConfigConstants extends 
AWSConfigConstants {
 
     public static final int DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES = 10;
 
+    public static final Duration DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT = 
Duration.ofSeconds(60);
+
     public static final long DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE = 1000L;
 
     public static final long DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX = 2000L;
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 163cd04..9aee3e4 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -800,34 +800,59 @@ public class KinesisDataFetcher<T> {
      * executed and all shard consuming threads will be interrupted.
      */
     public void shutdownFetcher() {
-        if (LOG.isInfoEnabled()) {
-            LOG.info(
-                    "Starting shutdown of shard consumer threads and AWS SDK 
resources of subtask {} ...",
-                    indexOfThisConsumerSubtask);
-        }
+        LOG.info(
+                "Starting shutdown of shard consumer threads and AWS SDK 
resources of subtask {} ...",
+                indexOfThisConsumerSubtask,
+                error.get());
 
         running = false;
+        try {
+            try {
+                deregisterStreamConsumer();
+            } catch (Exception e) {
+                LOG.warn("Encountered exception deregistering stream 
consumers", e);
+            }
 
-        StreamConsumerRegistrarUtil.deregisterStreamConsumers(configProps, 
streams);
-
-        recordPublisherFactory.close();
+            try {
+                closeRecordPublisherFactory();
+            } catch (Exception e) {
+                LOG.warn("Encountered exception closing record publisher 
factory", e);
+            }
+        } finally {
+            shardConsumersExecutor.shutdownNow();
 
-        shardConsumersExecutor.shutdownNow();
+            if (mainThread != null) {
+                mainThread
+                        .interrupt(); // the main thread may be sleeping for 
the discovery interval
+            }
 
-        if (mainThread != null) {
-            mainThread.interrupt(); // the main thread may be sleeping for the 
discovery interval
+            if (watermarkTracker != null) {
+                watermarkTracker.close();
+            }
+            this.recordEmitter.stop();
         }
 
-        if (watermarkTracker != null) {
-            watermarkTracker.close();
-        }
-        this.recordEmitter.stop();
+        LOG.info(
+                "Shutting down the shard consumer threads of subtask {} ...",
+                indexOfThisConsumerSubtask);
+    }
 
-        if (LOG.isInfoEnabled()) {
-            LOG.info(
-                    "Shutting down the shard consumer threads of subtask {} 
...",
-                    indexOfThisConsumerSubtask);
-        }
+    /**
+     * Closes recordRecordPublisherFactory. Allows test to override this to 
simulate exception for
+     * shutdown logic.
+     */
+    @VisibleForTesting
+    protected void closeRecordPublisherFactory() {
+        recordPublisherFactory.close();
+    }
+
+    /**
+     * Deregisters stream consumers. Allows test to override this to simulate 
exception for shutdown
+     * logic.
+     */
+    @VisibleForTesting
+    protected void deregisterStreamConsumer() {
+        StreamConsumerRegistrarUtil.deregisterStreamConsumers(configProps, 
streams);
     }
 
     /**
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
index 5104f8c..ef653d7 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
@@ -143,7 +143,10 @@ public class FanOutRecordPublisher implements 
RecordPublisher {
             final Consumer<SubscribeToShardEvent> eventConsumer) throws 
InterruptedException {
         FanOutShardSubscriber fanOutShardSubscriber =
                 new FanOutShardSubscriber(
-                        consumerArn, subscribedShard.getShard().getShardId(), 
kinesisProxy);
+                        consumerArn,
+                        subscribedShard.getShard().getShardId(),
+                        kinesisProxy,
+                        configuration.getSubscribeToShardTimeout());
         boolean complete;
 
         try {
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
index 36679c3..cd46876 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
@@ -52,6 +52,9 @@ public class FanOutRecordPublisherConfiguration {
     /** Base backoff millis for the deregister stream operation. */
     private final int subscribeToShardMaxRetries;
 
+    /** A timeout when waiting for a shard subscription to be established. */
+    private final Duration subscribeToShardTimeout;
+
     /** Maximum backoff millis for the subscribe to shard operation. */
     private final long subscribeToShardMaxBackoffMillis;
 
@@ -156,6 +159,13 @@ public class FanOutRecordPublisherConfiguration {
                                         
ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES))
                         .map(Integer::parseInt)
                         
.orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES);
+        this.subscribeToShardTimeout =
+                Optional.ofNullable(
+                                configProps.getProperty(
+                                        
ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS))
+                        .map(Integer::parseInt)
+                        .map(Duration::ofSeconds)
+                        
.orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
         this.subscribeToShardBaseBackoffMillis =
                 Optional.ofNullable(
                                 configProps.getProperty(
@@ -319,6 +329,11 @@ public class FanOutRecordPublisherConfiguration {
         return subscribeToShardMaxRetries;
     }
 
+    /** Get timeout when waiting for a shard subscription to be established. */
+    public Duration getSubscribeToShardTimeout() {
+        return subscribeToShardTimeout;
+    }
+
     /** Get maximum backoff millis for the subscribe to shard operation. */
     public long getSubscribeToShardMaxBackoffMillis() {
         return subscribeToShardMaxBackoffMillis;
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
index e06923f..be4df59 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
@@ -18,6 +18,7 @@
 package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
 import org.apache.flink.util.Preconditions;
@@ -35,15 +36,18 @@ import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream
 import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
 import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
 
+import java.time.Duration;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 /**
  * This class is responsible for acquiring an Enhanced Fan Out subscription 
and consuming records
@@ -88,12 +92,10 @@ public class FanOutShardSubscriber {
 
     /**
      * Read timeout will occur after 30 seconds, a sanity timeout to prevent 
lockup in unexpected
-     * error states. If the consumer does not receive a new event within the 
DEQUEUE_WAIT_SECONDS it
-     * will backoff and resubscribe. Under normal conditions heartbeat events 
are received even when
-     * there are no records to consume, so it is not expected for this timeout 
to occur under normal
-     * conditions.
+     * error states. If the consumer does not receive a new event within the 
QUEUE_TIMEOUT_SECONDS
+     * it will backoff and resubscribe.
      */
-    private static final int DEQUEUE_WAIT_SECONDS = 35;
+    private static final Duration DEFAULT_QUEUE_TIMEOUT = 
Duration.ofSeconds(35);
 
     private final BlockingQueue<FanOutSubscriptionEvent> queue =
             new LinkedBlockingQueue<>(QUEUE_CAPACITY);
@@ -107,18 +109,49 @@ public class FanOutShardSubscriber {
 
     private final String shardId;
 
+    private final Duration subscribeToShardTimeout;
+
+    private final Duration queueWaitTimeout;
+
     /**
-     * Create a new Fan Out subscriber.
+     * Create a new Fan Out Shard subscriber.
      *
      * @param consumerArn the stream consumer ARN
      * @param shardId the shard ID to subscribe to
      * @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2
+     * @param subscribeToShardTimeout A timeout when waiting for a shard 
subscription to be
+     *     established
      */
     FanOutShardSubscriber(
-            final String consumerArn, final String shardId, final 
KinesisProxyV2Interface kinesis) {
+            final String consumerArn,
+            final String shardId,
+            final KinesisProxyV2Interface kinesis,
+            final Duration subscribeToShardTimeout) {
+        this(consumerArn, shardId, kinesis, subscribeToShardTimeout, 
DEFAULT_QUEUE_TIMEOUT);
+    }
+
+    /**
+     * Create a new Fan Out Shard Subscriber.
+     *
+     * @param consumerArn the stream consumer ARN
+     * @param shardId the shard ID to subscribe to
+     * @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2
+     * @param subscribeToShardTimeout A timeout when waiting for a shard 
subscription to be
+     *     established
+     * @param queueWaitTimeout A timeout when enqueuing/de-queueing
+     */
+    @VisibleForTesting
+    FanOutShardSubscriber(
+            final String consumerArn,
+            final String shardId,
+            final KinesisProxyV2Interface kinesis,
+            final Duration subscribeToShardTimeout,
+            final Duration queueWaitTimeout) {
         this.kinesis = Preconditions.checkNotNull(kinesis);
         this.consumerArn = Preconditions.checkNotNull(consumerArn);
         this.shardId = Preconditions.checkNotNull(shardId);
+        this.subscribeToShardTimeout = subscribeToShardTimeout;
+        this.queueWaitTimeout = queueWaitTimeout;
     }
 
     /**
@@ -139,8 +172,9 @@ public class FanOutShardSubscriber {
             throws InterruptedException, FanOutSubscriberException {
         LOG.debug("Subscribing to shard {} ({})", shardId, consumerArn);
 
+        final FanOutShardSubscription subscription;
         try {
-            openSubscriptionToShard(startingPosition);
+            subscription = openSubscriptionToShard(startingPosition);
         } catch (FanOutSubscriberException ex) {
             // The only exception that should cause a failure is a 
ResourceNotFoundException
             // Rethrow the exception to trigger the application to terminate
@@ -151,7 +185,7 @@ public class FanOutShardSubscriber {
             throw ex;
         }
 
-        return consumeAllRecordsFromKinesisShard(eventConsumer);
+        return consumeAllRecordsFromKinesisShard(eventConsumer, subscription);
     }
 
     /**
@@ -163,7 +197,7 @@ public class FanOutShardSubscriber {
      * @param startingPosition the position in which to start consuming from
      * @throws FanOutSubscriberException when an exception is propagated from 
the networking stack
      */
-    private void openSubscriptionToShard(final StartingPosition 
startingPosition)
+    private FanOutShardSubscription openSubscriptionToShard(final 
StartingPosition startingPosition)
             throws FanOutSubscriberException, InterruptedException {
         SubscribeToShardRequest request =
                 SubscribeToShardRequest.builder()
@@ -196,7 +230,18 @@ public class FanOutShardSubscriber {
 
         kinesis.subscribeToShard(request, responseHandler);
 
-        waitForSubscriptionLatch.await();
+        boolean subscriptionEstablished =
+                waitForSubscriptionLatch.await(
+                        subscribeToShardTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
+
+        if (!subscriptionEstablished) {
+            final String errorMessage =
+                    "Timed out acquiring subscription - " + shardId + " (" + 
consumerArn + ")";
+            LOG.error(errorMessage);
+            subscription.cancelSubscription();
+            handleError(
+                    new RecoverableFanOutSubscriberException(new 
TimeoutException(errorMessage)));
+        }
 
         Throwable throwable = exception.get();
         if (throwable != null) {
@@ -208,6 +253,8 @@ public class FanOutShardSubscriber {
         // Request the first record to kick off consumption
         // Following requests are made by the FanOutShardSubscription on the 
netty thread
         subscription.requestRecord();
+
+        return subscription;
     }
 
     /**
@@ -234,6 +281,8 @@ public class FanOutShardSubscriber {
 
         if (isInterrupted(throwable)) {
             throw new FanOutSubscriberInterruptedException(throwable);
+        } else if (cause instanceof FanOutSubscriberException) {
+            throw (FanOutSubscriberException) cause;
         } else if (cause instanceof ReadTimeoutException) {
             // ReadTimeoutException occurs naturally under backpressure 
scenarios when full batches
             // take longer to
@@ -271,24 +320,26 @@ public class FanOutShardSubscriber {
      * while consuming records, indicated by a {@link SubscriptionErrorEvent}
      *
      * @param eventConsumer the event consumer to deliver records to
+     * @param subscription the subscription we are subscribed to
      * @return true if there are no more messages (complete), false if a 
subsequent subscription
      *     should be obtained
      * @throws FanOutSubscriberException when an exception is propagated from 
the networking stack
      * @throws InterruptedException when the thread is interrupted
      */
     private boolean consumeAllRecordsFromKinesisShard(
-            final Consumer<SubscribeToShardEvent> eventConsumer)
+            final Consumer<SubscribeToShardEvent> eventConsumer,
+            final FanOutShardSubscription subscription)
             throws InterruptedException, FanOutSubscriberException {
         String continuationSequenceNumber;
+        boolean result = true;
 
         do {
             FanOutSubscriptionEvent subscriptionEvent;
-            if (queue.isEmpty() && subscriptionErrorEvent.get() != null) {
+            if (subscriptionErrorEvent.get() != null) {
                 subscriptionEvent = subscriptionErrorEvent.get();
             } else {
-                // Read timeout will occur after 30 seconds, add a sanity 
timeout here to prevent
-                // lockup
-                subscriptionEvent = queue.poll(DEQUEUE_WAIT_SECONDS, SECONDS);
+                // Read timeout occurs after 30 seconds, add a sanity timeout 
to prevent lockup
+                subscriptionEvent = queue.poll(queueWaitTimeout.toMillis(), 
MILLISECONDS);
             }
 
             if (subscriptionEvent == null) {
@@ -296,7 +347,8 @@ public class FanOutShardSubscriber {
                         "Timed out polling events from network, reacquiring 
subscription - {} ({})",
                         shardId,
                         consumerArn);
-                return false;
+                result = false;
+                break;
             } else if (subscriptionEvent.isSubscribeToShardEvent()) {
                 SubscribeToShardEvent event = 
subscriptionEvent.getSubscribeToShardEvent();
                 continuationSequenceNumber = 
event.continuationSequenceNumber();
@@ -304,19 +356,18 @@ public class FanOutShardSubscriber {
                     eventConsumer.accept(event);
                 }
             } else if (subscriptionEvent.isSubscriptionComplete()) {
-                if (subscriptionErrorEvent.get() != null) {
-                    handleError(subscriptionErrorEvent.get().getThrowable());
-                }
-
                 // The subscription is complete, but the shard might not be, 
so we return incomplete
-                return false;
+                result = false;
+                break;
             } else {
                 handleError(subscriptionEvent.getThrowable());
-                return false;
+                result = false;
+                break;
             }
         } while (continuationSequenceNumber != null);
 
-        return true;
+        subscription.cancelSubscription();
+        return result;
     }
 
     /**
@@ -376,15 +427,19 @@ public class FanOutShardSubscriber {
                     consumerArn,
                     throwable);
 
+            SubscriptionErrorEvent subscriptionErrorEvent = new 
SubscriptionErrorEvent(throwable);
+            if (FanOutShardSubscriber.this.subscriptionErrorEvent.get() == 
null) {
+                
FanOutShardSubscriber.this.subscriptionErrorEvent.set(subscriptionErrorEvent);
+            } else {
+                LOG.warn("Error already queued. Ignoring subsequent 
exception.", throwable);
+            }
+
             // Cancel the subscription to signal the onNext to stop requesting 
data
             cancelSubscription();
 
-            if (subscriptionErrorEvent.get() == null) {
-                subscriptionErrorEvent.set(new 
SubscriptionErrorEvent(throwable));
-            } else {
-                LOG.warn(
-                        "Previous error passed to consumer for processing. 
Ignoring subsequent exception.",
-                        throwable);
+            // If there is space in the queue, insert the error to wake up 
blocked thread
+            if (queue.isEmpty()) {
+                queue.offer(subscriptionErrorEvent);
             }
         }
 
@@ -395,8 +450,12 @@ public class FanOutShardSubscriber {
         }
 
         private void cancelSubscription() {
-            if (!cancelled) {
-                cancelled = true;
+            if (cancelled) {
+                return;
+            }
+            cancelled = true;
+
+            if (subscription != null) {
                 subscription.cancel();
             }
         }
@@ -407,8 +466,25 @@ public class FanOutShardSubscriber {
          * @param event the event to enqueue
          */
         private void enqueueEvent(final FanOutSubscriptionEvent event) {
+            if (cancelled) {
+                return;
+            }
+
             try {
-                queue.put(event);
+                if (!queue.offer(event, queueWaitTimeout.toMillis(), 
TimeUnit.MILLISECONDS)) {
+                    final String errorMessage =
+                            "Timed out enqueuing event "
+                                    + event.getClass().getSimpleName()
+                                    + " - "
+                                    + shardId
+                                    + " ("
+                                    + consumerArn
+                                    + ")";
+                    LOG.error(errorMessage);
+                    onError(
+                            new RecoverableFanOutSubscriberException(
+                                    new TimeoutException(errorMessage)));
+                }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw new RuntimeException(e);
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 2cd5f33..aea01b0 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -72,10 +72,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.singletonList;
-import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.NONE;
-import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_REGISTRATION_TYPE;
-import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
-import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -96,12 +92,13 @@ public class KinesisDataFetcherTest extends TestLogger {
         assertTrue(fetcher.isRunning());
     }
 
-    @Test
-    public void testIsRunningFalseAfterShutDown() {
+    @Test(timeout = 10000)
+    public void testIsRunningFalseAfterShutDown() throws InterruptedException {
         KinesisDataFetcher<String> fetcher =
                 createTestDataFetcherWithNoShards(10, 2, "test-stream");
 
         fetcher.shutdownFetcher();
+        fetcher.awaitTermination();
 
         assertFalse(fetcher.isRunning());
     }
@@ -990,19 +987,15 @@ public class KinesisDataFetcherTest extends TestLogger {
                 fetcher.wasInterrupted);
     }
 
-    @Test
-    public void testRecordPublisherFactoryIsTornDown() {
-        Properties config = TestUtils.getStandardProperties();
-        config.setProperty(RECORD_PUBLISHER_TYPE, EFO.name());
-        config.setProperty(EFO_REGISTRATION_TYPE, NONE.name());
-
+    @Test(timeout = 1000L)
+    public void testRecordPublisherFactoryIsTornDown() throws 
InterruptedException {
         KinesisProxyV2Interface kinesisV2 = 
mock(KinesisProxyV2Interface.class);
 
         TestableKinesisDataFetcher<String> fetcher =
                 new TestableKinesisDataFetcher<String>(
                         singletonList("fakeStream1"),
                         new TestSourceContext<>(),
-                        config,
+                        TestUtils.efoProperties(),
                         new KinesisDeserializationSchemaWrapper<>(new 
SimpleStringSchema()),
                         10,
                         2,
@@ -1014,7 +1007,64 @@ public class KinesisDataFetcherTest extends TestLogger {
 
         fetcher.shutdownFetcher();
 
+        fetcher.awaitTermination();
+    }
+
+    @Test(timeout = 10000)
+    public void 
testRecordPublisherFactoryIsTornDownWhenDeregisterStreamConsumerThrowsException()
+            throws InterruptedException {
+        KinesisProxyV2Interface kinesisV2 = 
mock(KinesisProxyV2Interface.class);
+
+        TestableKinesisDataFetcher<String> fetcher =
+                new TestableKinesisDataFetcher<String>(
+                        singletonList("fakeStream1"),
+                        new TestSourceContext<>(),
+                        TestUtils.efoProperties(),
+                        new KinesisDeserializationSchemaWrapper<>(new 
SimpleStringSchema()),
+                        10,
+                        2,
+                        new AtomicReference<>(),
+                        new LinkedList<>(),
+                        new HashMap<>(),
+                        mock(KinesisProxyInterface.class),
+                        kinesisV2) {
+                    @Override
+                    protected void deregisterStreamConsumer() {
+                        throw new RuntimeException();
+                    }
+                };
+
+        fetcher.shutdownFetcher();
+
         verify(kinesisV2).close();
+        fetcher.awaitTermination();
+    }
+
+    @Test(timeout = 10000)
+    public void 
testExecutorServiceShutDownWhenCloseRecordPublisherFactoryThrowsException()
+            throws InterruptedException {
+        TestableKinesisDataFetcher<String> fetcher =
+                new TestableKinesisDataFetcher<String>(
+                        singletonList("fakeStream1"),
+                        new TestSourceContext<>(),
+                        TestUtils.efoProperties(),
+                        new KinesisDeserializationSchemaWrapper<>(new 
SimpleStringSchema()),
+                        10,
+                        2,
+                        new AtomicReference<>(),
+                        new LinkedList<>(),
+                        new HashMap<>(),
+                        mock(KinesisProxyInterface.class),
+                        mock(KinesisProxyV2Interface.class)) {
+                    @Override
+                    protected void closeRecordPublisherFactory() {
+                        throw new RuntimeException();
+                    }
+                };
+
+        fetcher.shutdownFetcher();
+
+        fetcher.awaitTermination();
     }
 
     private KinesisDataFetcher<String> createTestDataFetcherWithNoShards(
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java
index a4395c2..a558ffb 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java
@@ -24,9 +24,6 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -46,18 +43,18 @@ import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfi
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.REGISTER_STREAM_TIMEOUT_SECONDS;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS;
 import static org.junit.Assert.assertEquals;
 
 /** Tests for {@link FanOutRecordPublisherConfiguration}. */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(FanOutRecordPublisherConfiguration.class)
 public class FanOutRecordPublisherConfigurationTest extends TestLogger {
-    @Rule private ExpectedException exception = ExpectedException.none();
+
+    @Rule public ExpectedException thrown = ExpectedException.none();
 
     @Test
     public void testPollingRecordPublisher() {
-        exception.expect(IllegalArgumentException.class);
-        exception.expectMessage("Only efo record publisher can register a 
FanOutProperties.");
+        thrown.expect(IllegalArgumentException.class);
+        thrown.expectMessage("Only efo record publisher can register a 
FanOutProperties.");
 
         Properties testConfig = TestUtils.getStandardProperties();
         testConfig.setProperty(RECORD_PUBLISHER_TYPE, 
RecordPublisherType.POLLING.toString());
@@ -82,8 +79,8 @@ public class FanOutRecordPublisherConfigurationTest extends 
TestLogger {
     public void testEagerStrategyWithNoConsumerName() {
         String msg = "No valid enhanced fan-out consumer name is set through " 
+ EFO_CONSUMER_NAME;
 
-        exception.expect(IllegalArgumentException.class);
-        exception.expectMessage(msg);
+        thrown.expect(IllegalArgumentException.class);
+        thrown.expectMessage(msg);
 
         Properties testConfig = TestUtils.getStandardProperties();
         testConfig.setProperty(RECORD_PUBLISHER_TYPE, EFO.toString());
@@ -115,8 +112,8 @@ public class FanOutRecordPublisherConfigurationTest extends 
TestLogger {
 
         String msg =
                 "Invalid efo consumer arn settings for not providing consumer 
arns: flink.stream.efo.consumerarn.fakedstream1, 
flink.stream.efo.consumerarn.fakedstream2";
-        exception.expect(IllegalArgumentException.class);
-        exception.expectMessage(msg);
+        thrown.expect(IllegalArgumentException.class);
+        thrown.expectMessage(msg);
 
         Properties testConfig = TestUtils.getStandardProperties();
         testConfig.setProperty(RECORD_PUBLISHER_TYPE, EFO.toString());
@@ -131,8 +128,8 @@ public class FanOutRecordPublisherConfigurationTest extends 
TestLogger {
 
         String msg =
                 "Invalid efo consumer arn settings for not providing consumer 
arns: flink.stream.efo.consumerarn.fakedstream2";
-        exception.expect(IllegalArgumentException.class);
-        exception.expectMessage(msg);
+        thrown.expect(IllegalArgumentException.class);
+        thrown.expectMessage(msg);
 
         Properties testConfig = TestUtils.getStandardProperties();
         testConfig.setProperty(RECORD_PUBLISHER_TYPE, EFO.toString());
@@ -169,4 +166,29 @@ public class FanOutRecordPublisherConfigurationTest 
extends TestLogger {
         assertEquals(Duration.ofSeconds(60), 
configuration.getRegisterStreamConsumerTimeout());
         assertEquals(Duration.ofSeconds(240), 
configuration.getDeregisterStreamConsumerTimeout());
     }
+
+    @Test
+    public void testParseSubscribeToShardTimeout() {
+        Properties testConfig = TestUtils.getStandardProperties();
+        testConfig.setProperty(RECORD_PUBLISHER_TYPE, EFO.toString());
+        testConfig.setProperty(EFO_CONSUMER_NAME, "name");
+        testConfig.setProperty(SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS, "123");
+
+        FanOutRecordPublisherConfiguration configuration =
+                new FanOutRecordPublisherConfiguration(testConfig, 
Collections.emptyList());
+
+        assertEquals(Duration.ofSeconds(123), 
configuration.getSubscribeToShardTimeout());
+    }
+
+    @Test
+    public void testDefaultSubscribeToShardTimeout() {
+        Properties testConfig = TestUtils.getStandardProperties();
+        testConfig.setProperty(RECORD_PUBLISHER_TYPE, EFO.toString());
+        testConfig.setProperty(EFO_CONSUMER_NAME, "name");
+
+        FanOutRecordPublisherConfiguration configuration =
+                new FanOutRecordPublisherConfiguration(testConfig, 
Collections.emptyList());
+
+        assertEquals(Duration.ofSeconds(60), 
configuration.getSubscribeToShardTimeout());
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
index 0061cdc..c5962df 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
 
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory;
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SubscriptionErrorKinesisV2;
 
@@ -27,6 +28,10 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import software.amazon.awssdk.services.kinesis.model.StartingPosition;
 
+import java.time.Duration;
+
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT;
+
 /** Tests for {@link FanOutShardSubscriber}. */
 public class FanOutShardSubscriberTest {
 
@@ -42,7 +47,11 @@ public class FanOutShardSubscriberTest {
                         ReadTimeoutException.INSTANCE);
 
         FanOutShardSubscriber subscriber =
-                new FanOutShardSubscriber("consumerArn", "shardId", 
errorKinesisV2);
+                new FanOutShardSubscriber(
+                        "consumerArn",
+                        "shardId",
+                        errorKinesisV2,
+                        DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
 
         software.amazon.awssdk.services.kinesis.model.StartingPosition 
startingPosition =
                 
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
@@ -59,7 +68,11 @@ public class FanOutShardSubscriberTest {
                 
FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(error);
 
         FanOutShardSubscriber subscriber =
-                new FanOutShardSubscriber("consumerArn", "shardId", 
errorKinesisV2);
+                new FanOutShardSubscriber(
+                        "consumerArn",
+                        "shardId",
+                        errorKinesisV2,
+                        DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
 
         software.amazon.awssdk.services.kinesis.model.StartingPosition 
startingPosition =
                 
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
@@ -75,7 +88,11 @@ public class FanOutShardSubscriberTest {
                 
FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(error);
 
         FanOutShardSubscriber subscriber =
-                new FanOutShardSubscriber("consumerArn", "shardId", 
errorKinesisV2);
+                new FanOutShardSubscriber(
+                        "consumerArn",
+                        "shardId",
+                        errorKinesisV2,
+                        DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
 
         software.amazon.awssdk.services.kinesis.model.StartingPosition 
startingPosition =
                 
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
@@ -93,9 +110,56 @@ public class FanOutShardSubscriberTest {
                 
FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(error1, error2);
 
         FanOutShardSubscriber subscriber =
-                new FanOutShardSubscriber("consumerArn", "shardId", 
errorKinesisV2);
+                new FanOutShardSubscriber(
+                        "consumerArn",
+                        "shardId",
+                        errorKinesisV2,
+                        DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
 
         StartingPosition startingPosition = StartingPosition.builder().build();
         subscriber.subscribeToShardAndConsumeRecords(startingPosition, event 
-> {});
     }
+
+    @Test
+    public void testTimeoutSubscribingToShard() throws Exception {
+        
thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class);
+        thrown.expectMessage("Timed out acquiring subscription");
+
+        KinesisProxyV2Interface kinesis =
+                
FakeKinesisFanOutBehavioursFactory.failsToAcquireSubscription();
+
+        FanOutShardSubscriber subscriber =
+                new FanOutShardSubscriber("consumerArn", "shardId", kinesis, 
Duration.ofMillis(1));
+
+        StartingPosition startingPosition = StartingPosition.builder().build();
+        subscriber.subscribeToShardAndConsumeRecords(startingPosition, event 
-> {});
+    }
+
+    @Test
+    public void testTimeoutEnqueuingEvent() throws Exception {
+        
thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class);
+        thrown.expectMessage("Timed out enqueuing event 
SubscriptionNextEvent");
+
+        KinesisProxyV2Interface kinesis =
+                
FakeKinesisFanOutBehavioursFactory.boundedShard().withBatchCount(5).build();
+
+        FanOutShardSubscriber subscriber =
+                new FanOutShardSubscriber(
+                        "consumerArn",
+                        "shardId",
+                        kinesis,
+                        DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
+                        Duration.ofMillis(100));
+
+        StartingPosition startingPosition = StartingPosition.builder().build();
+        subscriber.subscribeToShardAndConsumeRecords(
+                startingPosition,
+                event -> {
+                    try {
+                        Thread.sleep(120);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                });
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
index 87cc518..1107efb 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
@@ -96,6 +96,10 @@ public class FakeKinesisFanOutBehavioursFactory {
         return new 
AlternatingSubscriptionErrorKinesisV2(LimitExceededException.builder().build());
     }
 
+    public static KinesisProxyV2Interface failsToAcquireSubscription() {
+        return new FailsToAcquireSubscriptionKinesis();
+    }
+
     // ------------------------------------------------------------------------
     //  Behaviours related to describing streams
     // ------------------------------------------------------------------------
@@ -122,6 +126,18 @@ public class FakeKinesisFanOutBehavioursFactory {
         return new 
StreamConsumerFakeKinesis.Builder().withStreamConsumerStatus(CREATING).build();
     }
 
+    /** A dummy EFO implementation that fails to acquire subscription (no 
response). */
+    private static class FailsToAcquireSubscriptionKinesis extends 
KinesisProxyV2InterfaceAdapter {
+
+        @Override
+        public CompletableFuture<Void> subscribeToShard(
+                final SubscribeToShardRequest request,
+                final SubscribeToShardResponseHandler responseHandler) {
+
+            return CompletableFuture.supplyAsync(() -> null);
+        }
+    }
+
     public static AbstractSingleShardFanOutKinesisV2 
emptyBatchFollowedBySingleRecord() {
         return new AbstractSingleShardFanOutKinesisV2(2) {
             private int subscription = 0;
@@ -189,6 +205,12 @@ public class FakeKinesisFanOutBehavioursFactory {
         @Override
         void sendEvents(Subscriber<? super SubscribeToShardEventStream> 
subscriber) {
             sendEventBatch(subscriber);
+            try {
+                // Add an artificial delay to allow records to flush
+                Thread.sleep(200);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
             for (Throwable throwable : throwables) {
                 subscriber.onError(throwable);
             }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index aadb3fa..d517cc3 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeseri
 
 import org.mockito.invocation.InvocationOnMock;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -49,6 +50,7 @@ public class TestableKinesisDataFetcher<T> extends 
KinesisDataFetcher<T> {
     private final OneShotLatch shutdownWaiter;
 
     private volatile boolean running;
+    private volatile boolean executorServiceShutdownNowCalled;
 
     public TestableKinesisDataFetcher(
             List<String> fakeStreams,
@@ -128,14 +130,22 @@ public class TestableKinesisDataFetcher<T> extends 
KinesisDataFetcher<T> {
     @Override
     protected ExecutorService createShardConsumersThreadPool(String 
subtaskName) {
         // this is just a dummy fetcher, so no need to create a thread pool 
for shard consumers
-        ExecutorService mockExecutor = mock(ExecutorService.class);
-        when(mockExecutor.isTerminated()).thenAnswer((InvocationOnMock 
invocation) -> !running);
+        ExecutorService mockExecutorService = mock(ExecutorService.class);
+        when(mockExecutorService.isTerminated())
+                .thenAnswer((InvocationOnMock invocation) -> !running);
+        when(mockExecutorService.shutdownNow())
+                .thenAnswer(
+                        invocationOnMock -> {
+                            executorServiceShutdownNowCalled = true;
+                            return Collections.emptyList();
+                        });
         try {
-            when(mockExecutor.awaitTermination(anyLong(), 
any())).thenReturn(!running);
+            when(mockExecutorService.awaitTermination(anyLong(), any()))
+                    .thenAnswer(invocationOnMock -> !running && 
executorServiceShutdownNowCalled);
         } catch (InterruptedException e) {
             // We're just trying to stub the method. Must acknowledge the 
checked exception.
         }
-        return mockExecutor;
+        return mockExecutorService;
     }
 
     @Override

Reply via email to