This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 84e2d57 [FLINK-22479[Kinesis][Consumer] Potential lock-up under error
condition
84e2d57 is described below
commit 84e2d57bf6e0b7e35ccc60a176363fde3793ff3e
Author: Danny Cranmer <[email protected]>
AuthorDate: Mon Apr 26 15:22:19 2021 +0100
[FLINK-22479[Kinesis][Consumer] Potential lock-up under error condition
---
.../kinesis/config/ConsumerConfigConstants.java | 6 +
.../kinesis/internals/KinesisDataFetcher.java | 65 +++++++---
.../publisher/fanout/FanOutRecordPublisher.java | 5 +-
.../fanout/FanOutRecordPublisherConfiguration.java | 15 +++
.../publisher/fanout/FanOutShardSubscriber.java | 144 ++++++++++++++++-----
.../kinesis/internals/KinesisDataFetcherTest.java | 76 +++++++++--
.../FanOutRecordPublisherConfigurationTest.java | 50 +++++--
.../fanout/FanOutShardSubscriberTest.java | 72 ++++++++++-
.../FakeKinesisFanOutBehavioursFactory.java | 22 ++++
.../testutils/TestableKinesisDataFetcher.java | 18 ++-
10 files changed, 383 insertions(+), 90 deletions(-)
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 3140e6e..4ea0bd8 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -217,6 +217,10 @@ public class ConsumerConfigConstants extends
AWSConfigConstants {
public static final String SUBSCRIBE_TO_SHARD_RETRIES =
"flink.shard.subscribetoshard.maxretries";
+ /** A timeout when waiting for a shard subscription to be established. */
+ public static final String SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS =
+ "flink.shard.subscribetoshard.timeout";
+
/** The base backoff time between each subscribeToShard attempt. */
public static final String SUBSCRIBE_TO_SHARD_BACKOFF_BASE =
"flink.shard.subscribetoshard.backoff.base";
@@ -363,6 +367,8 @@ public class ConsumerConfigConstants extends
AWSConfigConstants {
public static final int DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES = 10;
+ public static final Duration DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT =
Duration.ofSeconds(60);
+
public static final long DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE = 1000L;
public static final long DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX = 2000L;
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 163cd04..9aee3e4 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -800,34 +800,59 @@ public class KinesisDataFetcher<T> {
* executed and all shard consuming threads will be interrupted.
*/
public void shutdownFetcher() {
- if (LOG.isInfoEnabled()) {
- LOG.info(
- "Starting shutdown of shard consumer threads and AWS SDK
resources of subtask {} ...",
- indexOfThisConsumerSubtask);
- }
+ LOG.info(
+ "Starting shutdown of shard consumer threads and AWS SDK
resources of subtask {} ...",
+ indexOfThisConsumerSubtask,
+ error.get());
running = false;
+ try {
+ try {
+ deregisterStreamConsumer();
+ } catch (Exception e) {
+ LOG.warn("Encountered exception deregistering stream
consumers", e);
+ }
- StreamConsumerRegistrarUtil.deregisterStreamConsumers(configProps,
streams);
-
- recordPublisherFactory.close();
+ try {
+ closeRecordPublisherFactory();
+ } catch (Exception e) {
+ LOG.warn("Encountered exception closing record publisher
factory", e);
+ }
+ } finally {
+ shardConsumersExecutor.shutdownNow();
- shardConsumersExecutor.shutdownNow();
+ if (mainThread != null) {
+ mainThread
+ .interrupt(); // the main thread may be sleeping for
the discovery interval
+ }
- if (mainThread != null) {
- mainThread.interrupt(); // the main thread may be sleeping for the
discovery interval
+ if (watermarkTracker != null) {
+ watermarkTracker.close();
+ }
+ this.recordEmitter.stop();
}
- if (watermarkTracker != null) {
- watermarkTracker.close();
- }
- this.recordEmitter.stop();
+ LOG.info(
+ "Shutting down the shard consumer threads of subtask {} ...",
+ indexOfThisConsumerSubtask);
+ }
- if (LOG.isInfoEnabled()) {
- LOG.info(
- "Shutting down the shard consumer threads of subtask {}
...",
- indexOfThisConsumerSubtask);
- }
+ /**
+ * Closes recordRecordPublisherFactory. Allows test to override this to
simulate exception for
+ * shutdown logic.
+ */
+ @VisibleForTesting
+ protected void closeRecordPublisherFactory() {
+ recordPublisherFactory.close();
+ }
+
+ /**
+ * Deregisters stream consumers. Allows test to override this to simulate
exception for shutdown
+ * logic.
+ */
+ @VisibleForTesting
+ protected void deregisterStreamConsumer() {
+ StreamConsumerRegistrarUtil.deregisterStreamConsumers(configProps,
streams);
}
/**
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
index 5104f8c..ef653d7 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
@@ -143,7 +143,10 @@ public class FanOutRecordPublisher implements
RecordPublisher {
final Consumer<SubscribeToShardEvent> eventConsumer) throws
InterruptedException {
FanOutShardSubscriber fanOutShardSubscriber =
new FanOutShardSubscriber(
- consumerArn, subscribedShard.getShard().getShardId(),
kinesisProxy);
+ consumerArn,
+ subscribedShard.getShard().getShardId(),
+ kinesisProxy,
+ configuration.getSubscribeToShardTimeout());
boolean complete;
try {
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
index 36679c3..cd46876 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
@@ -52,6 +52,9 @@ public class FanOutRecordPublisherConfiguration {
/** Base backoff millis for the deregister stream operation. */
private final int subscribeToShardMaxRetries;
+ /** A timeout when waiting for a shard subscription to be established. */
+ private final Duration subscribeToShardTimeout;
+
/** Maximum backoff millis for the subscribe to shard operation. */
private final long subscribeToShardMaxBackoffMillis;
@@ -156,6 +159,13 @@ public class FanOutRecordPublisherConfiguration {
ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES))
.map(Integer::parseInt)
.orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES);
+ this.subscribeToShardTimeout =
+ Optional.ofNullable(
+ configProps.getProperty(
+
ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS))
+ .map(Integer::parseInt)
+ .map(Duration::ofSeconds)
+
.orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
this.subscribeToShardBaseBackoffMillis =
Optional.ofNullable(
configProps.getProperty(
@@ -319,6 +329,11 @@ public class FanOutRecordPublisherConfiguration {
return subscribeToShardMaxRetries;
}
+ /** Get timeout when waiting for a shard subscription to be established. */
+ public Duration getSubscribeToShardTimeout() {
+ return subscribeToShardTimeout;
+ }
+
/** Get maximum backoff millis for the subscribe to shard operation. */
public long getSubscribeToShardMaxBackoffMillis() {
return subscribeToShardMaxBackoffMillis;
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
index e06923f..be4df59 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
@@ -18,6 +18,7 @@
package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import org.apache.flink.util.Preconditions;
@@ -35,15 +36,18 @@ import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
-import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* This class is responsible for acquiring an Enhanced Fan Out subscription
and consuming records
@@ -88,12 +92,10 @@ public class FanOutShardSubscriber {
/**
* Read timeout will occur after 30 seconds, a sanity timeout to prevent
lockup in unexpected
- * error states. If the consumer does not receive a new event within the
DEQUEUE_WAIT_SECONDS it
- * will backoff and resubscribe. Under normal conditions heartbeat events
are received even when
- * there are no records to consume, so it is not expected for this timeout
to occur under normal
- * conditions.
+ * error states. If the consumer does not receive a new event within the
QUEUE_TIMEOUT_SECONDS
+ * it will backoff and resubscribe.
*/
- private static final int DEQUEUE_WAIT_SECONDS = 35;
+ private static final Duration DEFAULT_QUEUE_TIMEOUT =
Duration.ofSeconds(35);
private final BlockingQueue<FanOutSubscriptionEvent> queue =
new LinkedBlockingQueue<>(QUEUE_CAPACITY);
@@ -107,18 +109,49 @@ public class FanOutShardSubscriber {
private final String shardId;
+ private final Duration subscribeToShardTimeout;
+
+ private final Duration queueWaitTimeout;
+
/**
- * Create a new Fan Out subscriber.
+ * Create a new Fan Out Shard subscriber.
*
* @param consumerArn the stream consumer ARN
* @param shardId the shard ID to subscribe to
* @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2
+ * @param subscribeToShardTimeout A timeout when waiting for a shard
subscription to be
+ * established
*/
FanOutShardSubscriber(
- final String consumerArn, final String shardId, final
KinesisProxyV2Interface kinesis) {
+ final String consumerArn,
+ final String shardId,
+ final KinesisProxyV2Interface kinesis,
+ final Duration subscribeToShardTimeout) {
+ this(consumerArn, shardId, kinesis, subscribeToShardTimeout,
DEFAULT_QUEUE_TIMEOUT);
+ }
+
+ /**
+ * Create a new Fan Out Shard Subscriber.
+ *
+ * @param consumerArn the stream consumer ARN
+ * @param shardId the shard ID to subscribe to
+ * @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2
+ * @param subscribeToShardTimeout A timeout when waiting for a shard
subscription to be
+ * established
+ * @param queueWaitTimeout A timeout when enqueuing/de-queueing
+ */
+ @VisibleForTesting
+ FanOutShardSubscriber(
+ final String consumerArn,
+ final String shardId,
+ final KinesisProxyV2Interface kinesis,
+ final Duration subscribeToShardTimeout,
+ final Duration queueWaitTimeout) {
this.kinesis = Preconditions.checkNotNull(kinesis);
this.consumerArn = Preconditions.checkNotNull(consumerArn);
this.shardId = Preconditions.checkNotNull(shardId);
+ this.subscribeToShardTimeout = subscribeToShardTimeout;
+ this.queueWaitTimeout = queueWaitTimeout;
}
/**
@@ -139,8 +172,9 @@ public class FanOutShardSubscriber {
throws InterruptedException, FanOutSubscriberException {
LOG.debug("Subscribing to shard {} ({})", shardId, consumerArn);
+ final FanOutShardSubscription subscription;
try {
- openSubscriptionToShard(startingPosition);
+ subscription = openSubscriptionToShard(startingPosition);
} catch (FanOutSubscriberException ex) {
// The only exception that should cause a failure is a
ResourceNotFoundException
// Rethrow the exception to trigger the application to terminate
@@ -151,7 +185,7 @@ public class FanOutShardSubscriber {
throw ex;
}
- return consumeAllRecordsFromKinesisShard(eventConsumer);
+ return consumeAllRecordsFromKinesisShard(eventConsumer, subscription);
}
/**
@@ -163,7 +197,7 @@ public class FanOutShardSubscriber {
* @param startingPosition the position in which to start consuming from
* @throws FanOutSubscriberException when an exception is propagated from
the networking stack
*/
- private void openSubscriptionToShard(final StartingPosition
startingPosition)
+ private FanOutShardSubscription openSubscriptionToShard(final
StartingPosition startingPosition)
throws FanOutSubscriberException, InterruptedException {
SubscribeToShardRequest request =
SubscribeToShardRequest.builder()
@@ -196,7 +230,18 @@ public class FanOutShardSubscriber {
kinesis.subscribeToShard(request, responseHandler);
- waitForSubscriptionLatch.await();
+ boolean subscriptionEstablished =
+ waitForSubscriptionLatch.await(
+ subscribeToShardTimeout.toMillis(),
TimeUnit.MILLISECONDS);
+
+ if (!subscriptionEstablished) {
+ final String errorMessage =
+ "Timed out acquiring subscription - " + shardId + " (" +
consumerArn + ")";
+ LOG.error(errorMessage);
+ subscription.cancelSubscription();
+ handleError(
+ new RecoverableFanOutSubscriberException(new
TimeoutException(errorMessage)));
+ }
Throwable throwable = exception.get();
if (throwable != null) {
@@ -208,6 +253,8 @@ public class FanOutShardSubscriber {
// Request the first record to kick off consumption
// Following requests are made by the FanOutShardSubscription on the
netty thread
subscription.requestRecord();
+
+ return subscription;
}
/**
@@ -234,6 +281,8 @@ public class FanOutShardSubscriber {
if (isInterrupted(throwable)) {
throw new FanOutSubscriberInterruptedException(throwable);
+ } else if (cause instanceof FanOutSubscriberException) {
+ throw (FanOutSubscriberException) cause;
} else if (cause instanceof ReadTimeoutException) {
// ReadTimeoutException occurs naturally under backpressure
scenarios when full batches
// take longer to
@@ -271,24 +320,26 @@ public class FanOutShardSubscriber {
* while consuming records, indicated by a {@link SubscriptionErrorEvent}
*
* @param eventConsumer the event consumer to deliver records to
+ * @param subscription the subscription we are subscribed to
* @return true if there are no more messages (complete), false if a
subsequent subscription
* should be obtained
* @throws FanOutSubscriberException when an exception is propagated from
the networking stack
* @throws InterruptedException when the thread is interrupted
*/
private boolean consumeAllRecordsFromKinesisShard(
- final Consumer<SubscribeToShardEvent> eventConsumer)
+ final Consumer<SubscribeToShardEvent> eventConsumer,
+ final FanOutShardSubscription subscription)
throws InterruptedException, FanOutSubscriberException {
String continuationSequenceNumber;
+ boolean result = true;
do {
FanOutSubscriptionEvent subscriptionEvent;
- if (queue.isEmpty() && subscriptionErrorEvent.get() != null) {
+ if (subscriptionErrorEvent.get() != null) {
subscriptionEvent = subscriptionErrorEvent.get();
} else {
- // Read timeout will occur after 30 seconds, add a sanity
timeout here to prevent
- // lockup
- subscriptionEvent = queue.poll(DEQUEUE_WAIT_SECONDS, SECONDS);
+ // Read timeout occurs after 30 seconds, add a sanity timeout
to prevent lockup
+ subscriptionEvent = queue.poll(queueWaitTimeout.toMillis(),
MILLISECONDS);
}
if (subscriptionEvent == null) {
@@ -296,7 +347,8 @@ public class FanOutShardSubscriber {
"Timed out polling events from network, reacquiring
subscription - {} ({})",
shardId,
consumerArn);
- return false;
+ result = false;
+ break;
} else if (subscriptionEvent.isSubscribeToShardEvent()) {
SubscribeToShardEvent event =
subscriptionEvent.getSubscribeToShardEvent();
continuationSequenceNumber =
event.continuationSequenceNumber();
@@ -304,19 +356,18 @@ public class FanOutShardSubscriber {
eventConsumer.accept(event);
}
} else if (subscriptionEvent.isSubscriptionComplete()) {
- if (subscriptionErrorEvent.get() != null) {
- handleError(subscriptionErrorEvent.get().getThrowable());
- }
-
// The subscription is complete, but the shard might not be,
so we return incomplete
- return false;
+ result = false;
+ break;
} else {
handleError(subscriptionEvent.getThrowable());
- return false;
+ result = false;
+ break;
}
} while (continuationSequenceNumber != null);
- return true;
+ subscription.cancelSubscription();
+ return result;
}
/**
@@ -376,15 +427,19 @@ public class FanOutShardSubscriber {
consumerArn,
throwable);
+ SubscriptionErrorEvent subscriptionErrorEvent = new
SubscriptionErrorEvent(throwable);
+ if (FanOutShardSubscriber.this.subscriptionErrorEvent.get() ==
null) {
+
FanOutShardSubscriber.this.subscriptionErrorEvent.set(subscriptionErrorEvent);
+ } else {
+ LOG.warn("Error already queued. Ignoring subsequent
exception.", throwable);
+ }
+
// Cancel the subscription to signal the onNext to stop requesting
data
cancelSubscription();
- if (subscriptionErrorEvent.get() == null) {
- subscriptionErrorEvent.set(new
SubscriptionErrorEvent(throwable));
- } else {
- LOG.warn(
- "Previous error passed to consumer for processing.
Ignoring subsequent exception.",
- throwable);
+ // If there is space in the queue, insert the error to wake up
blocked thread
+ if (queue.isEmpty()) {
+ queue.offer(subscriptionErrorEvent);
}
}
@@ -395,8 +450,12 @@ public class FanOutShardSubscriber {
}
private void cancelSubscription() {
- if (!cancelled) {
- cancelled = true;
+ if (cancelled) {
+ return;
+ }
+ cancelled = true;
+
+ if (subscription != null) {
subscription.cancel();
}
}
@@ -407,8 +466,25 @@ public class FanOutShardSubscriber {
* @param event the event to enqueue
*/
private void enqueueEvent(final FanOutSubscriptionEvent event) {
+ if (cancelled) {
+ return;
+ }
+
try {
- queue.put(event);
+ if (!queue.offer(event, queueWaitTimeout.toMillis(),
TimeUnit.MILLISECONDS)) {
+ final String errorMessage =
+ "Timed out enqueuing event "
+ + event.getClass().getSimpleName()
+ + " - "
+ + shardId
+ + " ("
+ + consumerArn
+ + ")";
+ LOG.error(errorMessage);
+ onError(
+ new RecoverableFanOutSubscriberException(
+ new TimeoutException(errorMessage)));
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 2cd5f33..aea01b0 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -72,10 +72,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static java.util.Collections.singletonList;
-import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.NONE;
-import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_REGISTRATION_TYPE;
-import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
-import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -96,12 +92,13 @@ public class KinesisDataFetcherTest extends TestLogger {
assertTrue(fetcher.isRunning());
}
- @Test
- public void testIsRunningFalseAfterShutDown() {
+ @Test(timeout = 10000)
+ public void testIsRunningFalseAfterShutDown() throws InterruptedException {
KinesisDataFetcher<String> fetcher =
createTestDataFetcherWithNoShards(10, 2, "test-stream");
fetcher.shutdownFetcher();
+ fetcher.awaitTermination();
assertFalse(fetcher.isRunning());
}
@@ -990,19 +987,15 @@ public class KinesisDataFetcherTest extends TestLogger {
fetcher.wasInterrupted);
}
- @Test
- public void testRecordPublisherFactoryIsTornDown() {
- Properties config = TestUtils.getStandardProperties();
- config.setProperty(RECORD_PUBLISHER_TYPE, EFO.name());
- config.setProperty(EFO_REGISTRATION_TYPE, NONE.name());
-
+ @Test(timeout = 1000L)
+ public void testRecordPublisherFactoryIsTornDown() throws
InterruptedException {
KinesisProxyV2Interface kinesisV2 =
mock(KinesisProxyV2Interface.class);
TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<String>(
singletonList("fakeStream1"),
new TestSourceContext<>(),
- config,
+ TestUtils.efoProperties(),
new KinesisDeserializationSchemaWrapper<>(new
SimpleStringSchema()),
10,
2,
@@ -1014,7 +1007,64 @@ public class KinesisDataFetcherTest extends TestLogger {
fetcher.shutdownFetcher();
+ fetcher.awaitTermination();
+ }
+
+ @Test(timeout = 10000)
+ public void
testRecordPublisherFactoryIsTornDownWhenDeregisterStreamConsumerThrowsException()
+ throws InterruptedException {
+ KinesisProxyV2Interface kinesisV2 =
mock(KinesisProxyV2Interface.class);
+
+ TestableKinesisDataFetcher<String> fetcher =
+ new TestableKinesisDataFetcher<String>(
+ singletonList("fakeStream1"),
+ new TestSourceContext<>(),
+ TestUtils.efoProperties(),
+ new KinesisDeserializationSchemaWrapper<>(new
SimpleStringSchema()),
+ 10,
+ 2,
+ new AtomicReference<>(),
+ new LinkedList<>(),
+ new HashMap<>(),
+ mock(KinesisProxyInterface.class),
+ kinesisV2) {
+ @Override
+ protected void deregisterStreamConsumer() {
+ throw new RuntimeException();
+ }
+ };
+
+ fetcher.shutdownFetcher();
+
verify(kinesisV2).close();
+ fetcher.awaitTermination();
+ }
+
+ @Test(timeout = 10000)
+ public void
testExecutorServiceShutDownWhenCloseRecordPublisherFactoryThrowsException()
+ throws InterruptedException {
+ TestableKinesisDataFetcher<String> fetcher =
+ new TestableKinesisDataFetcher<String>(
+ singletonList("fakeStream1"),
+ new TestSourceContext<>(),
+ TestUtils.efoProperties(),
+ new KinesisDeserializationSchemaWrapper<>(new
SimpleStringSchema()),
+ 10,
+ 2,
+ new AtomicReference<>(),
+ new LinkedList<>(),
+ new HashMap<>(),
+ mock(KinesisProxyInterface.class),
+ mock(KinesisProxyV2Interface.class)) {
+ @Override
+ protected void closeRecordPublisherFactory() {
+ throw new RuntimeException();
+ }
+ };
+
+ fetcher.shutdownFetcher();
+
+ fetcher.awaitTermination();
}
private KinesisDataFetcher<String> createTestDataFetcherWithNoShards(
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java
index a4395c2..a558ffb 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java
@@ -24,9 +24,6 @@ import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import java.time.Duration;
import java.util.ArrayList;
@@ -46,18 +43,18 @@ import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfi
import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.REGISTER_STREAM_TIMEOUT_SECONDS;
import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO;
+import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS;
import static org.junit.Assert.assertEquals;
/** Tests for {@link FanOutRecordPublisherConfiguration}. */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(FanOutRecordPublisherConfiguration.class)
public class FanOutRecordPublisherConfigurationTest extends TestLogger {
- @Rule private ExpectedException exception = ExpectedException.none();
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
@Test
public void testPollingRecordPublisher() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Only efo record publisher can register a
FanOutProperties.");
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Only efo record publisher can register a
FanOutProperties.");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(RECORD_PUBLISHER_TYPE,
RecordPublisherType.POLLING.toString());
@@ -82,8 +79,8 @@ public class FanOutRecordPublisherConfigurationTest extends
TestLogger {
public void testEagerStrategyWithNoConsumerName() {
String msg = "No valid enhanced fan-out consumer name is set through "
+ EFO_CONSUMER_NAME;
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage(msg);
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(msg);
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(RECORD_PUBLISHER_TYPE, EFO.toString());
@@ -115,8 +112,8 @@ public class FanOutRecordPublisherConfigurationTest extends
TestLogger {
String msg =
"Invalid efo consumer arn settings for not providing consumer
arns: flink.stream.efo.consumerarn.fakedstream1,
flink.stream.efo.consumerarn.fakedstream2";
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage(msg);
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(msg);
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(RECORD_PUBLISHER_TYPE, EFO.toString());
@@ -131,8 +128,8 @@ public class FanOutRecordPublisherConfigurationTest extends
TestLogger {
String msg =
"Invalid efo consumer arn settings for not providing consumer
arns: flink.stream.efo.consumerarn.fakedstream2";
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage(msg);
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(msg);
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(RECORD_PUBLISHER_TYPE, EFO.toString());
@@ -169,4 +166,29 @@ public class FanOutRecordPublisherConfigurationTest
extends TestLogger {
assertEquals(Duration.ofSeconds(60),
configuration.getRegisterStreamConsumerTimeout());
assertEquals(Duration.ofSeconds(240),
configuration.getDeregisterStreamConsumerTimeout());
}
+
+ @Test
+ public void testParseSubscribeToShardTimeout() {
+ Properties testConfig = TestUtils.getStandardProperties();
+ testConfig.setProperty(RECORD_PUBLISHER_TYPE, EFO.toString());
+ testConfig.setProperty(EFO_CONSUMER_NAME, "name");
+ testConfig.setProperty(SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS, "123");
+
+ FanOutRecordPublisherConfiguration configuration =
+ new FanOutRecordPublisherConfiguration(testConfig,
Collections.emptyList());
+
+ assertEquals(Duration.ofSeconds(123),
configuration.getSubscribeToShardTimeout());
+ }
+
+ @Test
+ public void testDefaultSubscribeToShardTimeout() {
+ Properties testConfig = TestUtils.getStandardProperties();
+ testConfig.setProperty(RECORD_PUBLISHER_TYPE, EFO.toString());
+ testConfig.setProperty(EFO_CONSUMER_NAME, "name");
+
+ FanOutRecordPublisherConfiguration configuration =
+ new FanOutRecordPublisherConfiguration(testConfig,
Collections.emptyList());
+
+ assertEquals(Duration.ofSeconds(60),
configuration.getSubscribeToShardTimeout());
+ }
}
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
index 0061cdc..c5962df 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
@@ -17,6 +17,7 @@
package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory;
import
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SubscriptionErrorKinesisV2;
@@ -27,6 +28,10 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import java.time.Duration;
+
+import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT;
+
/** Tests for {@link FanOutShardSubscriber}. */
public class FanOutShardSubscriberTest {
@@ -42,7 +47,11 @@ public class FanOutShardSubscriberTest {
ReadTimeoutException.INSTANCE);
FanOutShardSubscriber subscriber =
- new FanOutShardSubscriber("consumerArn", "shardId",
errorKinesisV2);
+ new FanOutShardSubscriber(
+ "consumerArn",
+ "shardId",
+ errorKinesisV2,
+ DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
software.amazon.awssdk.services.kinesis.model.StartingPosition
startingPosition =
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
@@ -59,7 +68,11 @@ public class FanOutShardSubscriberTest {
FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(error);
FanOutShardSubscriber subscriber =
- new FanOutShardSubscriber("consumerArn", "shardId",
errorKinesisV2);
+ new FanOutShardSubscriber(
+ "consumerArn",
+ "shardId",
+ errorKinesisV2,
+ DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
software.amazon.awssdk.services.kinesis.model.StartingPosition
startingPosition =
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
@@ -75,7 +88,11 @@ public class FanOutShardSubscriberTest {
FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(error);
FanOutShardSubscriber subscriber =
- new FanOutShardSubscriber("consumerArn", "shardId",
errorKinesisV2);
+ new FanOutShardSubscriber(
+ "consumerArn",
+ "shardId",
+ errorKinesisV2,
+ DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
software.amazon.awssdk.services.kinesis.model.StartingPosition
startingPosition =
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
@@ -93,9 +110,56 @@ public class FanOutShardSubscriberTest {
FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(error1, error2);
FanOutShardSubscriber subscriber =
- new FanOutShardSubscriber("consumerArn", "shardId",
errorKinesisV2);
+ new FanOutShardSubscriber(
+ "consumerArn",
+ "shardId",
+ errorKinesisV2,
+ DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
StartingPosition startingPosition = StartingPosition.builder().build();
subscriber.subscribeToShardAndConsumeRecords(startingPosition, event
-> {});
}
+
+ @Test
+ public void testTimeoutSubscribingToShard() throws Exception {
+
thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class);
+ thrown.expectMessage("Timed out acquiring subscription");
+
+ KinesisProxyV2Interface kinesis =
+
FakeKinesisFanOutBehavioursFactory.failsToAcquireSubscription();
+
+ FanOutShardSubscriber subscriber =
+ new FanOutShardSubscriber("consumerArn", "shardId", kinesis,
Duration.ofMillis(1));
+
+ StartingPosition startingPosition = StartingPosition.builder().build();
+ subscriber.subscribeToShardAndConsumeRecords(startingPosition, event
-> {});
+ }
+
+ @Test
+ public void testTimeoutEnqueuingEvent() throws Exception {
+
thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class);
+ thrown.expectMessage("Timed out enqueuing event
SubscriptionNextEvent");
+
+ KinesisProxyV2Interface kinesis =
+
FakeKinesisFanOutBehavioursFactory.boundedShard().withBatchCount(5).build();
+
+ FanOutShardSubscriber subscriber =
+ new FanOutShardSubscriber(
+ "consumerArn",
+ "shardId",
+ kinesis,
+ DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
+ Duration.ofMillis(100));
+
+ StartingPosition startingPosition = StartingPosition.builder().build();
+ subscriber.subscribeToShardAndConsumeRecords(
+ startingPosition,
+ event -> {
+ try {
+ Thread.sleep(120);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ });
+ }
}
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
index 87cc518..1107efb 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
@@ -96,6 +96,10 @@ public class FakeKinesisFanOutBehavioursFactory {
return new
AlternatingSubscriptionErrorKinesisV2(LimitExceededException.builder().build());
}
+ public static KinesisProxyV2Interface failsToAcquireSubscription() {
+ return new FailsToAcquireSubscriptionKinesis();
+ }
+
// ------------------------------------------------------------------------
// Behaviours related to describing streams
// ------------------------------------------------------------------------
@@ -122,6 +126,18 @@ public class FakeKinesisFanOutBehavioursFactory {
return new
StreamConsumerFakeKinesis.Builder().withStreamConsumerStatus(CREATING).build();
}
+ /** A dummy EFO implementation that fails to acquire subscription (no
response). */
+ private static class FailsToAcquireSubscriptionKinesis extends
KinesisProxyV2InterfaceAdapter {
+
+ @Override
+ public CompletableFuture<Void> subscribeToShard(
+ final SubscribeToShardRequest request,
+ final SubscribeToShardResponseHandler responseHandler) {
+
+ return CompletableFuture.supplyAsync(() -> null);
+ }
+ }
+
public static AbstractSingleShardFanOutKinesisV2
emptyBatchFollowedBySingleRecord() {
return new AbstractSingleShardFanOutKinesisV2(2) {
private int subscription = 0;
@@ -189,6 +205,12 @@ public class FakeKinesisFanOutBehavioursFactory {
@Override
void sendEvents(Subscriber<? super SubscribeToShardEventStream>
subscriber) {
sendEventBatch(subscriber);
+ try {
+ // Add an artificial delay to allow records to flush
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
for (Throwable throwable : throwables) {
subscriber.onError(throwable);
}
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index aadb3fa..d517cc3 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -28,6 +28,7 @@ import
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeseri
import org.mockito.invocation.InvocationOnMock;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -49,6 +50,7 @@ public class TestableKinesisDataFetcher<T> extends
KinesisDataFetcher<T> {
private final OneShotLatch shutdownWaiter;
private volatile boolean running;
+ private volatile boolean executorServiceShutdownNowCalled;
public TestableKinesisDataFetcher(
List<String> fakeStreams,
@@ -128,14 +130,22 @@ public class TestableKinesisDataFetcher<T> extends
KinesisDataFetcher<T> {
@Override
protected ExecutorService createShardConsumersThreadPool(String
subtaskName) {
// this is just a dummy fetcher, so no need to create a thread pool
for shard consumers
- ExecutorService mockExecutor = mock(ExecutorService.class);
- when(mockExecutor.isTerminated()).thenAnswer((InvocationOnMock
invocation) -> !running);
+ ExecutorService mockExecutorService = mock(ExecutorService.class);
+ when(mockExecutorService.isTerminated())
+ .thenAnswer((InvocationOnMock invocation) -> !running);
+ when(mockExecutorService.shutdownNow())
+ .thenAnswer(
+ invocationOnMock -> {
+ executorServiceShutdownNowCalled = true;
+ return Collections.emptyList();
+ });
try {
- when(mockExecutor.awaitTermination(anyLong(),
any())).thenReturn(!running);
+ when(mockExecutorService.awaitTermination(anyLong(), any()))
+ .thenAnswer(invocationOnMock -> !running &&
executorServiceShutdownNowCalled);
} catch (InterruptedException e) {
// We're just trying to stub the method. Must acknowledge the
checked exception.
}
- return mockExecutor;
+ return mockExecutorService;
}
@Override