tzulitai commented on a change in pull request #13102:
URL: https://github.com/apache/flink/pull/13102#discussion_r489964177
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -102,6 +107,11 @@ public void run() {
try {
while (isRunning()) {
final RecordPublisherRunResult result =
recordPublisher.run(batch -> {
+ if
(!batch.getDeaggregatedRecords().isEmpty()) {
Review comment:
Could you briefly explain the reason behind adding this log?
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutSubscriberException;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff;
+import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.EncryptionType;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+import static
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder;
+
+/**
+ * A {@link RecordPublisher} that will read and forward records from Kinesis
using EFO, to the subscriber.
+ * Records are consumed via Enhanced Fan Out subscriptions using
SubscribeToShard API.
+ */
+@Internal
+public class FanOutRecordPublisher implements RecordPublisher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FanOutRecordPublisher.class);
+
+ private final FullJitterBackoff backoff;
+
+ private final String consumerArn;
+
+ private final KinesisProxyV2Interface kinesisProxy;
+
+ private final StreamShardHandle subscribedShard;
+
+ private final FanOutRecordPublisherConfiguration configuration;
+
+ /** The current attempt in the case of subsequent recoverable errors. */
+ private int attempt = 0;
+
+ private StartingPosition nextStartingPosition;
+
+ /**
+ * Instantiate a new FanOutRecordPublisher.
+ * Consumes data from KDS using EFO SubscribeToShard over AWS SDK V2.x
+ *
+ * @param startingPosition the position in the shard to start consuming
from
+ * @param consumerArn the consumer ARN of the stream consumer
+ * @param subscribedShard the shard to consumer from
+ * @param kinesisProxy the proxy used to talk to Kinesis services
+ * @param configuration the record publisher configuration
+ */
+ public FanOutRecordPublisher(
+ final StartingPosition startingPosition,
+ final String consumerArn,
+ final StreamShardHandle subscribedShard,
+ final KinesisProxyV2Interface kinesisProxy,
+ final FanOutRecordPublisherConfiguration configuration,
+ final FullJitterBackoff backoff) {
+ this.nextStartingPosition =
Preconditions.checkNotNull(startingPosition);
+ this.consumerArn = Preconditions.checkNotNull(consumerArn);
+ this.subscribedShard =
Preconditions.checkNotNull(subscribedShard);
+ this.kinesisProxy = Preconditions.checkNotNull(kinesisProxy);
+ this.configuration = Preconditions.checkNotNull(configuration);
+ this.backoff = Preconditions.checkNotNull(backoff);
+ }
+
+ @Override
+ public RecordPublisherRunResult run(final RecordBatchConsumer
recordConsumer) throws InterruptedException {
+ LOG.info("Running fan out record publisher on {}::{} from {} -
{}",
+ subscribedShard.getStreamName(),
+ subscribedShard.getShard().getShardId(),
+ nextStartingPosition.getShardIteratorType(),
+ nextStartingPosition.getStartingMarker());
+
+ Consumer<SubscribeToShardEvent> eventConsumer = event -> {
+ RecordBatch recordBatch = new
RecordBatch(toSdkV1Records(event.records()), subscribedShard,
event.millisBehindLatest());
+ SequenceNumber sequenceNumber =
recordConsumer.accept(recordBatch);
+ nextStartingPosition =
StartingPosition.continueFromSequenceNumber(sequenceNumber);
+ };
+
+ RecordPublisherRunResult result = runWithBackoff(eventConsumer);
+
+ LOG.info("Subscription expired {}::{}, with status {}",
+ subscribedShard.getStreamName(),
+ subscribedShard.getShard().getShardId(),
+ result);
+
+ return result;
+ }
+
+ /**
+ * Runs the record publisher, will sleep for configuration computed
jitter period in the case of certain exceptions.
+ * Unrecoverable exceptions are thrown to terminate the application.
+ *
+ * @param eventConsumer the consumer to pass events to
+ * @return {@code COMPLETE} if the shard is complete and this shard
consumer should exit
+ * @throws InterruptedException
+ */
+ private RecordPublisherRunResult runWithBackoff(
+ final Consumer<SubscribeToShardEvent> eventConsumer)
throws InterruptedException {
+ FanOutShardSubscriber fanOutShardSubscriber = new
FanOutShardSubscriber(
+ consumerArn,
+ subscribedShard.getShard().getShardId(),
+ kinesisProxy);
+ boolean complete;
+
+ try {
+ complete =
fanOutShardSubscriber.subscribeToShardAndConsumeRecords(
+ toSdkV2StartingPosition(nextStartingPosition),
eventConsumer);
+ attempt = 0;
+ } catch (FanOutSubscriberException ex) {
+ // We have received an error from the network layer
+ // This can be due to limits being exceeded, network
timeouts, etc
+ // We should backoff, reacquire a subscription and try
again
+ if (ex.getCause() instanceof ResourceNotFoundException)
{
+ LOG.warn("Received ResourceNotFoundException.
Either the shard does not exist, or the stream subscriber has been
deregistered." +
+ "Marking this shard as complete {}
({})", subscribedShard.getShard().getShardId(), consumerArn);
+
+ return COMPLETE;
+ }
+
+ if (attempt ==
configuration.getSubscribeToShardMaxRetries()) {
+ throw new RuntimeException("Maximum reties
exceeded for SubscribeToShard. " +
+ "Failed " +
configuration.getSubscribeToShardMaxRetries() + " times.");
+ }
+
+ backoff(ex);
+ return INCOMPLETE;
+ }
+
+ return complete ? COMPLETE : INCOMPLETE;
+ }
+
+ private void backoff(final Throwable ex) throws InterruptedException {
+ long backoffMillis = backoff.calculateFullJitterBackoff(
+ configuration.getSubscribeToShardBaseBackoffMillis(),
+ configuration.getSubscribeToShardMaxBackoffMillis(),
+ configuration.getSubscribeToShardExpConstant(),
+ ++attempt);
Review comment:
nitpick:
What do you think about moving the attempt increment into the `catch` block
in `runWithBackoff`?
That seems like a more appropriate place flow-wise.
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
+import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.util.Preconditions;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class is responsible for acquiring an Enhanced Fan Out subscription
and consuming records from a shard.
+ * A queue is used to buffer records between the Kinesis Proxy and Flink
application. This allows processing
+ * to be separated from consumption; errors thrown in the consumption layer do
not propagate up to application.
+ *
+ * <pre>{@code [
+ * | ----------- Source Connector Thread ----------- | |
--- KinesisAsyncClient Thread(s) -- |
+ * | FanOutRecordPublisher | FanOutShardSubscription | == blocking queue == |
KinesisProxyV2 | KinesisAsyncClient |
+ * ]}</pre>
+ * <p>
+ * Three types of message are passed over the queue for inter-thread
communication:
+ * <ul>
+ * <li>{@link SubscriptionNextEvent} - passes data from the
network to the consumer</li>
+ * <li>{@link SubscriptionCompleteEvent} - indicates a
subscription has expired</li>
+ * <li>{@link SubscriptionErrorEvent} - passes an exception from
the network to the consumer</li>
+ * </ul>
+ * </p>
+ * <p>
+ * The blocking queue has a maximum capacity of 1 record.
+ * This allows backpressure to be applied closer to the network stack and
results in record prefetch.
+ * At maximum capacity we will have three {@link SubscribeToShardEvent} in
memory (per instance of this class):
+ * <ul>
+ * <li>1 event being processed by the consumer</li>
+ * <li>1 event enqueued in the blocking queue</li>
+ * <li>1 event being added to the queue by the network (blocking)</li>
+ * </ul>
+ * </p>
+ */
+@Internal
+public class FanOutShardSubscriber {
Review comment:
Could you address the various warnings in this class?
Such as `Access can be package-private` and missing serialVersionUID etc.
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
##########
@@ -277,7 +278,9 @@ public static void
setAwsClientConfigProperties(ClientConfiguration config,
* @return the starting position
*/
public static StartingPosition getStartingPosition(final SequenceNumber
sequenceNumber, final Properties configProps) {
- if
(SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get().equals(sequenceNumber)) {
+ if (sequenceNumber.equals(SENTINEL_LATEST_SEQUENCE_NUM.get())) {
+ return StartingPosition.fromTimestamp(new Date());
Review comment:
Since this a pretty delicate thing, could you add a comment here on why
we do this "special" translation, so that its easier for future generations to
understand the reasoning here :)
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
+import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.util.Preconditions;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class is responsible for acquiring an Enhanced Fan Out subscription
and consuming records from a shard.
+ * A queue is used to buffer records between the Kinesis Proxy and Flink
application. This allows processing
+ * to be separated from consumption; errors thrown in the consumption layer do
not propagate up to application.
+ *
+ * <pre>{@code [
+ * | ----------- Source Connector Thread ----------- | |
--- KinesisAsyncClient Thread(s) -- |
+ * | FanOutRecordPublisher | FanOutShardSubscription | == blocking queue == |
KinesisProxyV2 | KinesisAsyncClient |
+ * ]}</pre>
+ * <p>
+ * Three types of message are passed over the queue for inter-thread
communication:
+ * <ul>
+ * <li>{@link SubscriptionNextEvent} - passes data from the
network to the consumer</li>
+ * <li>{@link SubscriptionCompleteEvent} - indicates a
subscription has expired</li>
+ * <li>{@link SubscriptionErrorEvent} - passes an exception from
the network to the consumer</li>
+ * </ul>
+ * </p>
+ * <p>
+ * The blocking queue has a maximum capacity of 1 record.
+ * This allows backpressure to be applied closer to the network stack and
results in record prefetch.
+ * At maximum capacity we will have three {@link SubscribeToShardEvent} in
memory (per instance of this class):
+ * <ul>
+ * <li>1 event being processed by the consumer</li>
+ * <li>1 event enqueued in the blocking queue</li>
+ * <li>1 event being added to the queue by the network (blocking)</li>
+ * </ul>
+ * </p>
+ */
+@Internal
+public class FanOutShardSubscriber {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FanOutShardSubscriber.class);
+
+ /**
+ * The maximum capacity of the queue between the network and
consumption thread.
+ * The queue is mainly used to isolate networking from consumption such
that errors do not bubble up.
+ * This queue also acts as a buffer resulting in a record prefetch and
reduced latency.
+ */
+ private static final int QUEUE_CAPACITY = 1;
+
+ /**
+ * Read timeout will occur after 30 seconds, a sanity timeout to
prevent lockup in unexpected error states.
+ * If the consumer does not receive a new event within the
DEQUEUE_WAIT_SECONDS it will backoff and resubscribe.
+ * Under normal conditions heartbeat events are received even when
there are no records to consume, so it is not
+ * expected for this timeout to occur under normal conditions.
+ */
+ private static final int DEQUEUE_WAIT_SECONDS = 35;
+
+ /** The time to wait when enqueuing events to allow complete/error
events to "push in front" of data . */
+ private static final int ENQUEUE_WAIT_SECONDS = 5;
+
+ private final BlockingQueue<FanOutSubscriptionEvent> queue = new
LinkedBlockingQueue<>(QUEUE_CAPACITY);
+
+ private final KinesisProxyV2Interface kinesis;
+
+ private final String consumerArn;
+
+ private final String shardId;
+
+ /**
+ * Create a new Fan Out subscriber.
+ *
+ * @param consumerArn the stream consumer ARN
+ * @param shardId the shard ID to subscribe to
+ * @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2
+ */
+ public FanOutShardSubscriber(final String consumerArn, final String
shardId, final KinesisProxyV2Interface kinesis) {
+ this.kinesis = Preconditions.checkNotNull(kinesis);
+ this.consumerArn = Preconditions.checkNotNull(consumerArn);
+ this.shardId = Preconditions.checkNotNull(shardId);
+ }
+
+ /**
+ * Obtains a subscription to the shard from the specified {@code
startingPosition}.
+ * {@link SubscribeToShardEvent} received from KDS are delivered to the
given {@code eventConsumer}.
+ * Returns false if there are records left to consume from the shard.
+ *
+ * @param startingPosition the position in the stream in which to start
receiving records
+ * @param eventConsumer the consumer to deliver received events to
+ * @return true if there are no more messages (complete), false if a
subsequent subscription should be obtained
+ * @throws FanOutSubscriberException when an exception is propagated
from the networking stack
+ * @throws InterruptedException
+ */
+ public boolean subscribeToShardAndConsumeRecords(
+ final StartingPosition startingPosition,
+ final Consumer<SubscribeToShardEvent> eventConsumer)
throws InterruptedException, FanOutSubscriberException {
+ LOG.debug("Subscribing to shard {} ({})", shardId, consumerArn);
+
+ try {
+ openSubscriptionToShard(startingPosition);
+ } catch (FanOutSubscriberException ex) {
+ // The only exception that should cause a failure is a
ResourceNotFoundException
+ // Rethrow the exception to trigger the application to
terminate
+ if (ex.getCause() instanceof ResourceNotFoundException)
{
+ throw (ResourceNotFoundException) ex.getCause();
+ }
+
+ throw ex;
+ }
+
+ return consumeAllRecordsFromKinesisShard(eventConsumer);
+ }
+
+ /**
+ * Calls {@link KinesisProxyV2#subscribeToShard} and waits to acquire a
subscription.
+ * In the event a non-recoverable error occurs this method will rethrow
the exception.
+ * Once the subscription is acquired the client signals to the producer
that we are ready to receive records.
+ *
+ * @param startingPosition the position in which to start consuming from
+ * @throws FanOutSubscriberException when an exception is propagated
from the networking stack
+ */
+ private void openSubscriptionToShard(final StartingPosition
startingPosition) throws FanOutSubscriberException, InterruptedException {
+ SubscribeToShardRequest request =
SubscribeToShardRequest.builder()
+ .consumerARN(consumerArn)
+ .shardId(shardId)
+ .startingPosition(startingPosition)
+ .build();
+
+ AtomicReference<Throwable> exception = new AtomicReference<>();
+ CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
+ FanOutShardSubscription subscription = new
FanOutShardSubscription(waitForSubscriptionLatch);
+
+ SubscribeToShardResponseHandler responseHandler =
SubscribeToShardResponseHandler
+ .builder()
+ .onError(e -> {
+ // Errors that occur while trying to acquire a
subscription are only thrown from here
+ // Errors that occur during the subscription
are surfaced here and to the FanOutShardSubscription
+ // (errors are ignored here once the
subscription is open)
+ if (waitForSubscriptionLatch.getCount() > 0) {
+ exception.set(e);
+ waitForSubscriptionLatch.countDown();
+ }
+ })
+ .subscriber(() -> subscription)
+ .build();
+
+ kinesis.subscribeToShard(request, responseHandler);
+
+ waitForSubscriptionLatch.await();
+
+ Throwable throwable = exception.get();
+ if (throwable != null) {
+ handleError(throwable);
+ }
+
+ LOG.debug("Acquired subscription - {} ({})", shardId,
consumerArn);
+
+ // Request the first record to kick off consumption
+ // Following requests are made by the FanOutShardSubscription
on the netty thread
+ subscription.requestRecord();
+ }
+
+ /**
+ * Update the reference to the latest networking error in this object.
+ * Parent caller can interrogate to decide how to handle error.
+ *
+ * @param throwable the exception that has occurred
+ */
+ private void handleError(final Throwable throwable) throws
FanOutSubscriberException {
+ Throwable cause;
+ if (throwable instanceof CompletionException || throwable
instanceof ExecutionException) {
+ cause = throwable.getCause();
+ } else {
+ cause = throwable;
+ }
+
+ LOG.warn("Error occurred on EFO subscription: {} - ({}). {}
({})",
+ throwable.getClass().getName(), throwable.getMessage(),
shardId, consumerArn, cause);
+
+ throw new FanOutSubscriberException(cause);
+ }
+
+ /**
+ * Once the subscription is open, records will be delivered to the
{@link BlockingQueue}.
+ * Queue capacity is hardcoded to 1 record, the queue is used solely to
separate consumption and processing.
+ * However, this buffer will result in latency reduction as records are
pre-fetched as a result.
+ * This method will poll the queue and exit under any of these
conditions:
+ * - {@code continuationSequenceNumber} is {@code null}, indicating the
shard is complete
+ * - The subscription expires, indicated by a {@link
SubscriptionCompleteEvent}
+ * - There is an error while consuming records, indicated by a {@link
SubscriptionErrorEvent}
+ *
+ * @param eventConsumer the event consumer to deliver records to
+ * @return true if there are no more messages (complete), false if a
subsequent subscription should be obtained
+ * @throws FanOutSubscriberException when an exception is propagated
from the networking stack
+ * @throws InterruptedException
+ */
+ private boolean consumeAllRecordsFromKinesisShard(
+ final Consumer<SubscribeToShardEvent> eventConsumer)
throws InterruptedException, FanOutSubscriberException {
+ String continuationSequenceNumber = null;
+
+ do {
+ // Read timeout will occur after 30 seconds, add a
sanity timeout here to prevent lockup
+ FanOutSubscriptionEvent subscriptionEvent =
queue.poll(DEQUEUE_WAIT_SECONDS, SECONDS);
+
+ if (subscriptionEvent == null) {
+ LOG.debug("Timed out polling events from
network, reacquiring subscription - {} ({})", shardId, consumerArn);
+ return false;
+ } else if (subscriptionEvent.isSubscribeToShardEvent())
{
+ SubscribeToShardEvent event =
subscriptionEvent.getSubscribeToShardEvent();
+ continuationSequenceNumber =
event.continuationSequenceNumber();
+ if (!event.records().isEmpty()) {
+ eventConsumer.accept(event);
+ }
+ } else if (subscriptionEvent.isSubscriptionComplete()) {
+ // The subscription is complete, but the shard
might not be, so we return incomplete
+ return false;
+ } else {
+ handleError(subscriptionEvent.getThrowable());
+ return false;
+ }
+ } while (continuationSequenceNumber != null);
+
+ return true;
+ }
+
+ /**
+ * The {@link FanOutShardSubscription} subscribes to the events coming
from KDS and adds them to the {@link BlockingQueue}.
+ * Backpressure is applied based on the maximum capacity of the queue.
+ * The {@link Subscriber} methods of this class are invoked by a thread
from the {@link KinesisAsyncClient}.
+ */
+ private class FanOutShardSubscription implements
Subscriber<SubscribeToShardEventStream> {
+
+ private Subscription subscription;
+
+ private boolean cancelled = false;
+
+ private final CountDownLatch waitForSubscriptionLatch;
+
+ private final Object lockObject = new Object();
+
+ private FanOutShardSubscription(final CountDownLatch
waitForSubscriptionLatch) {
+ this.waitForSubscriptionLatch =
waitForSubscriptionLatch;
+ }
+
+ /**
+ * Flag to the producer that we are ready to receive more
events.
+ */
+ public void requestRecord() {
+ if (!cancelled) {
+ LOG.debug("Requesting more records from EFO
subscription - {} ({})", shardId, consumerArn);
+ subscription.request(1);
+ }
+ }
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ this.subscription = subscription;
+ waitForSubscriptionLatch.countDown();
+ }
+
+ @Override
+ public void onNext(SubscribeToShardEventStream
subscribeToShardEventStream) {
+ subscribeToShardEventStream.accept(new
SubscribeToShardResponseHandler.Visitor() {
+ @Override
+ public void visit(SubscribeToShardEvent event) {
+ synchronized (lockObject) {
+ if (enqueueEventWithRetry(new
SubscriptionNextEvent(event))) {
+ requestRecord();
+ }
+ }
+ }
+ });
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOG.debug("Error occurred on EFO subscription: {} -
({}). {} ({})",
+ throwable.getClass().getName(),
throwable.getMessage(), shardId, consumerArn);
+
+ // Cancel the subscription to signal the onNext to stop
queuing and requesting data
+ cancelSubscription();
+
+ synchronized (lockObject) {
+ // Empty the queue and add a poison pill to
terminate this subscriber
+ // The synchronized block ensures that new data
is not written in the meantime
+ queue.clear();
+ enqueueEvent(new
SubscriptionErrorEvent(throwable));
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ LOG.debug("EFO subscription complete - {} ({})",
shardId, consumerArn);
+ enqueueEvent(new SubscriptionCompleteEvent());
+ }
+
+ private void cancelSubscription() {
+ if (!cancelled) {
+ cancelled = true;
+ subscription.cancel();
+ }
+ }
+
+ /**
+ * Continuously attempt to enqueue an event until successful or
the subscription is cancelled (due to error).
+ * When backpressure applied by the consumer exceeds 30s for a
single batch, a ReadTimeoutException will be
+ * thrown by the network stack. This will result in the
subscription be cancelled and this event being discarded.
+ * The subscription would subsequently be reacquired and the
discarded data would be fetched again.
+ *
+ * @param event the event to enqueue
+ * @return true if the event was successfully enqueued.
+ */
+ private boolean enqueueEventWithRetry(final
FanOutSubscriptionEvent event) {
+ boolean result = false;
+ do {
+ if (cancelled) {
+ break;
+ }
+
+ synchronized (lockObject) {
+ result = enqueueEvent(event);
+ }
+ } while (!result);
+
+ return result;
+ }
+
+ /**
+ * Offers the event to the queue.
+ *
+ * @param event the event to enqueue
+ * @return true if the event was successfully enqueued.
+ */
+ private boolean enqueueEvent(final FanOutSubscriptionEvent
event) {
+ try {
+ if (!queue.offer(event, ENQUEUE_WAIT_SECONDS,
SECONDS)) {
Review comment:
For my own clarification / understanding:
As I understand it, in case of `onError` during an ongoing `onNext`, this
blocks job cancellation up to `ENQUEUE_WAIT_SECONDS`, because an
`SubscriptionErrorEvent` cannot be added to the queue until the offer has
timed-out and releases the lock?
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
+import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.util.Preconditions;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class is responsible for acquiring an Enhanced Fan Out subscription
and consuming records from a shard.
+ * A queue is used to buffer records between the Kinesis Proxy and Flink
application. This allows processing
+ * to be separated from consumption; errors thrown in the consumption layer do
not propagate up to application.
+ *
+ * <pre>{@code [
+ * | ----------- Source Connector Thread ----------- | |
--- KinesisAsyncClient Thread(s) -- |
+ * | FanOutRecordPublisher | FanOutShardSubscription | == blocking queue == |
KinesisProxyV2 | KinesisAsyncClient |
+ * ]}</pre>
+ * <p>
+ * Three types of message are passed over the queue for inter-thread
communication:
+ * <ul>
+ * <li>{@link SubscriptionNextEvent} - passes data from the
network to the consumer</li>
+ * <li>{@link SubscriptionCompleteEvent} - indicates a
subscription has expired</li>
+ * <li>{@link SubscriptionErrorEvent} - passes an exception from
the network to the consumer</li>
+ * </ul>
+ * </p>
+ * <p>
+ * The blocking queue has a maximum capacity of 1 record.
+ * This allows backpressure to be applied closer to the network stack and
results in record prefetch.
+ * At maximum capacity we will have three {@link SubscribeToShardEvent} in
memory (per instance of this class):
+ * <ul>
+ * <li>1 event being processed by the consumer</li>
+ * <li>1 event enqueued in the blocking queue</li>
+ * <li>1 event being added to the queue by the network (blocking)</li>
+ * </ul>
+ * </p>
+ */
Review comment:
💯 thank you for this great Javadoc, made review much easier!
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -30,15 +34,28 @@
@Internal
public class KinesisProxyV2 implements KinesisProxyV2Interface {
+ /** An Asynchronous client used to communicate with AWS services. */
private final KinesisAsyncClient kinesisAsyncClient;
/**
- * Create a new KinesisProxyV2 based on the supplied configuration
properties.
+ * Create a new KinesisProxyV2 using the provided Async Client.
*
* @param kinesisAsyncClient the kinesis async client used to
communicate with Kinesis
*/
public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
this.kinesisAsyncClient =
Preconditions.checkNotNull(kinesisAsyncClient);
}
+ @Override
+ public CompletableFuture<Void> subscribeToShard(
+ final SubscribeToShardRequest request,
+ final SubscribeToShardResponseHandler responseHandler) {
+ return kinesisAsyncClient.subscribeToShard(request,
responseHandler);
+ }
+
+ @Override
+ public void close() {
+ kinesisAsyncClient.close();
Review comment:
Noted. It's fine to leave it in the next PR, I'll merge them in one
batch.
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
+import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.util.Preconditions;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class is responsible for acquiring an Enhanced Fan Out subscription
and consuming records from a shard.
+ * A queue is used to buffer records between the Kinesis Proxy and Flink
application. This allows processing
+ * to be separated from consumption; errors thrown in the consumption layer do
not propagate up to application.
+ *
+ * <pre>{@code [
+ * | ----------- Source Connector Thread ----------- | |
--- KinesisAsyncClient Thread(s) -- |
+ * | FanOutRecordPublisher | FanOutShardSubscription | == blocking queue == |
KinesisProxyV2 | KinesisAsyncClient |
+ * ]}</pre>
+ * <p>
+ * Three types of message are passed over the queue for inter-thread
communication:
+ * <ul>
+ * <li>{@link SubscriptionNextEvent} - passes data from the
network to the consumer</li>
+ * <li>{@link SubscriptionCompleteEvent} - indicates a
subscription has expired</li>
+ * <li>{@link SubscriptionErrorEvent} - passes an exception from
the network to the consumer</li>
+ * </ul>
+ * </p>
+ * <p>
+ * The blocking queue has a maximum capacity of 1 record.
+ * This allows backpressure to be applied closer to the network stack and
results in record prefetch.
+ * At maximum capacity we will have three {@link SubscribeToShardEvent} in
memory (per instance of this class):
+ * <ul>
+ * <li>1 event being processed by the consumer</li>
+ * <li>1 event enqueued in the blocking queue</li>
+ * <li>1 event being added to the queue by the network (blocking)</li>
+ * </ul>
+ * </p>
+ */
+@Internal
+public class FanOutShardSubscriber {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FanOutShardSubscriber.class);
+
+ /**
+ * The maximum capacity of the queue between the network and
consumption thread.
+ * The queue is mainly used to isolate networking from consumption such
that errors do not bubble up.
+ * This queue also acts as a buffer resulting in a record prefetch and
reduced latency.
+ */
+ private static final int QUEUE_CAPACITY = 1;
+
+ /**
+ * Read timeout will occur after 30 seconds, a sanity timeout to
prevent lockup in unexpected error states.
+ * If the consumer does not receive a new event within the
DEQUEUE_WAIT_SECONDS it will backoff and resubscribe.
+ * Under normal conditions heartbeat events are received even when
there are no records to consume, so it is not
+ * expected for this timeout to occur under normal conditions.
+ */
+ private static final int DEQUEUE_WAIT_SECONDS = 35;
+
+ /** The time to wait when enqueuing events to allow complete/error
events to "push in front" of data . */
+ private static final int ENQUEUE_WAIT_SECONDS = 5;
+
+ private final BlockingQueue<FanOutSubscriptionEvent> queue = new
LinkedBlockingQueue<>(QUEUE_CAPACITY);
+
+ private final KinesisProxyV2Interface kinesis;
+
+ private final String consumerArn;
+
+ private final String shardId;
+
+ /**
+ * Create a new Fan Out subscriber.
+ *
+ * @param consumerArn the stream consumer ARN
+ * @param shardId the shard ID to subscribe to
+ * @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2
+ */
+ public FanOutShardSubscriber(final String consumerArn, final String
shardId, final KinesisProxyV2Interface kinesis) {
+ this.kinesis = Preconditions.checkNotNull(kinesis);
+ this.consumerArn = Preconditions.checkNotNull(consumerArn);
+ this.shardId = Preconditions.checkNotNull(shardId);
+ }
+
+ /**
+ * Obtains a subscription to the shard from the specified {@code
startingPosition}.
+ * {@link SubscribeToShardEvent} received from KDS are delivered to the
given {@code eventConsumer}.
+ * Returns false if there are records left to consume from the shard.
+ *
+ * @param startingPosition the position in the stream in which to start
receiving records
+ * @param eventConsumer the consumer to deliver received events to
+ * @return true if there are no more messages (complete), false if a
subsequent subscription should be obtained
+ * @throws FanOutSubscriberException when an exception is propagated
from the networking stack
+ * @throws InterruptedException
+ */
+ public boolean subscribeToShardAndConsumeRecords(
+ final StartingPosition startingPosition,
+ final Consumer<SubscribeToShardEvent> eventConsumer)
throws InterruptedException, FanOutSubscriberException {
+ LOG.debug("Subscribing to shard {} ({})", shardId, consumerArn);
+
+ try {
+ openSubscriptionToShard(startingPosition);
+ } catch (FanOutSubscriberException ex) {
+ // The only exception that should cause a failure is a
ResourceNotFoundException
+ // Rethrow the exception to trigger the application to
terminate
+ if (ex.getCause() instanceof ResourceNotFoundException)
{
+ throw (ResourceNotFoundException) ex.getCause();
+ }
+
+ throw ex;
+ }
+
+ return consumeAllRecordsFromKinesisShard(eventConsumer);
+ }
+
+ /**
+ * Calls {@link KinesisProxyV2#subscribeToShard} and waits to acquire a
subscription.
+ * In the event a non-recoverable error occurs this method will rethrow
the exception.
+ * Once the subscription is acquired the client signals to the producer
that we are ready to receive records.
+ *
+ * @param startingPosition the position in which to start consuming from
+ * @throws FanOutSubscriberException when an exception is propagated
from the networking stack
+ */
+ private void openSubscriptionToShard(final StartingPosition
startingPosition) throws FanOutSubscriberException, InterruptedException {
+ SubscribeToShardRequest request =
SubscribeToShardRequest.builder()
+ .consumerARN(consumerArn)
+ .shardId(shardId)
+ .startingPosition(startingPosition)
+ .build();
+
+ AtomicReference<Throwable> exception = new AtomicReference<>();
+ CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
+ FanOutShardSubscription subscription = new
FanOutShardSubscription(waitForSubscriptionLatch);
+
+ SubscribeToShardResponseHandler responseHandler =
SubscribeToShardResponseHandler
+ .builder()
+ .onError(e -> {
+ // Errors that occur while trying to acquire a
subscription are only thrown from here
+ // Errors that occur during the subscription
are surfaced here and to the FanOutShardSubscription
+ // (errors are ignored here once the
subscription is open)
+ if (waitForSubscriptionLatch.getCount() > 0) {
+ exception.set(e);
+ waitForSubscriptionLatch.countDown();
+ }
+ })
+ .subscriber(() -> subscription)
+ .build();
+
+ kinesis.subscribeToShard(request, responseHandler);
+
+ waitForSubscriptionLatch.await();
+
+ Throwable throwable = exception.get();
+ if (throwable != null) {
+ handleError(throwable);
+ }
+
+ LOG.debug("Acquired subscription - {} ({})", shardId,
consumerArn);
+
+ // Request the first record to kick off consumption
+ // Following requests are made by the FanOutShardSubscription
on the netty thread
+ subscription.requestRecord();
+ }
+
+ /**
+ * Update the reference to the latest networking error in this object.
+ * Parent caller can interrogate to decide how to handle error.
+ *
+ * @param throwable the exception that has occurred
+ */
+ private void handleError(final Throwable throwable) throws
FanOutSubscriberException {
+ Throwable cause;
+ if (throwable instanceof CompletionException || throwable
instanceof ExecutionException) {
+ cause = throwable.getCause();
+ } else {
+ cause = throwable;
+ }
+
+ LOG.warn("Error occurred on EFO subscription: {} - ({}). {}
({})",
+ throwable.getClass().getName(), throwable.getMessage(),
shardId, consumerArn, cause);
+
+ throw new FanOutSubscriberException(cause);
+ }
+
+ /**
+ * Once the subscription is open, records will be delivered to the
{@link BlockingQueue}.
+ * Queue capacity is hardcoded to 1 record, the queue is used solely to
separate consumption and processing.
+ * However, this buffer will result in latency reduction as records are
pre-fetched as a result.
+ * This method will poll the queue and exit under any of these
conditions:
+ * - {@code continuationSequenceNumber} is {@code null}, indicating the
shard is complete
+ * - The subscription expires, indicated by a {@link
SubscriptionCompleteEvent}
+ * - There is an error while consuming records, indicated by a {@link
SubscriptionErrorEvent}
+ *
+ * @param eventConsumer the event consumer to deliver records to
+ * @return true if there are no more messages (complete), false if a
subsequent subscription should be obtained
+ * @throws FanOutSubscriberException when an exception is propagated
from the networking stack
+ * @throws InterruptedException
+ */
+ private boolean consumeAllRecordsFromKinesisShard(
+ final Consumer<SubscribeToShardEvent> eventConsumer)
throws InterruptedException, FanOutSubscriberException {
+ String continuationSequenceNumber = null;
+
+ do {
+ // Read timeout will occur after 30 seconds, add a
sanity timeout here to prevent lockup
+ FanOutSubscriptionEvent subscriptionEvent =
queue.poll(DEQUEUE_WAIT_SECONDS, SECONDS);
+
+ if (subscriptionEvent == null) {
+ LOG.debug("Timed out polling events from
network, reacquiring subscription - {} ({})", shardId, consumerArn);
+ return false;
+ } else if (subscriptionEvent.isSubscribeToShardEvent())
{
+ SubscribeToShardEvent event =
subscriptionEvent.getSubscribeToShardEvent();
+ continuationSequenceNumber =
event.continuationSequenceNumber();
+ if (!event.records().isEmpty()) {
+ eventConsumer.accept(event);
+ }
+ } else if (subscriptionEvent.isSubscriptionComplete()) {
+ // The subscription is complete, but the shard
might not be, so we return incomplete
+ return false;
+ } else {
+ handleError(subscriptionEvent.getThrowable());
+ return false;
+ }
+ } while (continuationSequenceNumber != null);
+
+ return true;
+ }
+
+ /**
+ * The {@link FanOutShardSubscription} subscribes to the events coming
from KDS and adds them to the {@link BlockingQueue}.
+ * Backpressure is applied based on the maximum capacity of the queue.
+ * The {@link Subscriber} methods of this class are invoked by a thread
from the {@link KinesisAsyncClient}.
+ */
+ private class FanOutShardSubscription implements
Subscriber<SubscribeToShardEventStream> {
+
+ private Subscription subscription;
+
+ private boolean cancelled = false;
Review comment:
Should this be `volatile`? It looks like the variable is accessed
concurrently in `onNext` and `onError`.
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
+import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.util.Preconditions;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class is responsible for acquiring an Enhanced Fan Out subscription
and consuming records from a shard.
+ * A queue is used to buffer records between the Kinesis Proxy and Flink
application. This allows processing
+ * to be separated from consumption; errors thrown in the consumption layer do
not propagate up to application.
+ *
+ * <pre>{@code [
+ * | ----------- Source Connector Thread ----------- | |
--- KinesisAsyncClient Thread(s) -- |
+ * | FanOutRecordPublisher | FanOutShardSubscription | == blocking queue == |
KinesisProxyV2 | KinesisAsyncClient |
+ * ]}</pre>
+ * <p>
+ * Three types of message are passed over the queue for inter-thread
communication:
+ * <ul>
+ * <li>{@link SubscriptionNextEvent} - passes data from the
network to the consumer</li>
+ * <li>{@link SubscriptionCompleteEvent} - indicates a
subscription has expired</li>
+ * <li>{@link SubscriptionErrorEvent} - passes an exception from
the network to the consumer</li>
+ * </ul>
+ * </p>
+ * <p>
+ * The blocking queue has a maximum capacity of 1 record.
+ * This allows backpressure to be applied closer to the network stack and
results in record prefetch.
+ * At maximum capacity we will have three {@link SubscribeToShardEvent} in
memory (per instance of this class):
+ * <ul>
+ * <li>1 event being processed by the consumer</li>
+ * <li>1 event enqueued in the blocking queue</li>
+ * <li>1 event being added to the queue by the network (blocking)</li>
+ * </ul>
+ * </p>
+ */
+@Internal
+public class FanOutShardSubscriber {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FanOutShardSubscriber.class);
+
+ /**
+ * The maximum capacity of the queue between the network and
consumption thread.
+ * The queue is mainly used to isolate networking from consumption such
that errors do not bubble up.
+ * This queue also acts as a buffer resulting in a record prefetch and
reduced latency.
+ */
+ private static final int QUEUE_CAPACITY = 1;
+
+ /**
+ * Read timeout will occur after 30 seconds, a sanity timeout to
prevent lockup in unexpected error states.
+ * If the consumer does not receive a new event within the
DEQUEUE_WAIT_SECONDS it will backoff and resubscribe.
+ * Under normal conditions heartbeat events are received even when
there are no records to consume, so it is not
+ * expected for this timeout to occur under normal conditions.
+ */
+ private static final int DEQUEUE_WAIT_SECONDS = 35;
+
+ /** The time to wait when enqueuing events to allow complete/error
events to "push in front" of data . */
+ private static final int ENQUEUE_WAIT_SECONDS = 5;
+
+ private final BlockingQueue<FanOutSubscriptionEvent> queue = new
LinkedBlockingQueue<>(QUEUE_CAPACITY);
+
+ private final KinesisProxyV2Interface kinesis;
+
+ private final String consumerArn;
+
+ private final String shardId;
+
+ /**
+ * Create a new Fan Out subscriber.
+ *
+ * @param consumerArn the stream consumer ARN
+ * @param shardId the shard ID to subscribe to
+ * @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2
+ */
+ public FanOutShardSubscriber(final String consumerArn, final String
shardId, final KinesisProxyV2Interface kinesis) {
+ this.kinesis = Preconditions.checkNotNull(kinesis);
+ this.consumerArn = Preconditions.checkNotNull(consumerArn);
+ this.shardId = Preconditions.checkNotNull(shardId);
+ }
+
+ /**
+ * Obtains a subscription to the shard from the specified {@code
startingPosition}.
+ * {@link SubscribeToShardEvent} received from KDS are delivered to the
given {@code eventConsumer}.
+ * Returns false if there are records left to consume from the shard.
+ *
+ * @param startingPosition the position in the stream in which to start
receiving records
+ * @param eventConsumer the consumer to deliver received events to
+ * @return true if there are no more messages (complete), false if a
subsequent subscription should be obtained
+ * @throws FanOutSubscriberException when an exception is propagated
from the networking stack
+ * @throws InterruptedException
+ */
+ public boolean subscribeToShardAndConsumeRecords(
+ final StartingPosition startingPosition,
+ final Consumer<SubscribeToShardEvent> eventConsumer)
throws InterruptedException, FanOutSubscriberException {
+ LOG.debug("Subscribing to shard {} ({})", shardId, consumerArn);
+
+ try {
+ openSubscriptionToShard(startingPosition);
+ } catch (FanOutSubscriberException ex) {
+ // The only exception that should cause a failure is a
ResourceNotFoundException
+ // Rethrow the exception to trigger the application to
terminate
+ if (ex.getCause() instanceof ResourceNotFoundException)
{
+ throw (ResourceNotFoundException) ex.getCause();
+ }
+
+ throw ex;
+ }
+
+ return consumeAllRecordsFromKinesisShard(eventConsumer);
+ }
+
+ /**
+ * Calls {@link KinesisProxyV2#subscribeToShard} and waits to acquire a
subscription.
+ * In the event a non-recoverable error occurs this method will rethrow
the exception.
+ * Once the subscription is acquired the client signals to the producer
that we are ready to receive records.
+ *
+ * @param startingPosition the position in which to start consuming from
+ * @throws FanOutSubscriberException when an exception is propagated
from the networking stack
+ */
+ private void openSubscriptionToShard(final StartingPosition
startingPosition) throws FanOutSubscriberException, InterruptedException {
+ SubscribeToShardRequest request =
SubscribeToShardRequest.builder()
+ .consumerARN(consumerArn)
+ .shardId(shardId)
+ .startingPosition(startingPosition)
+ .build();
+
+ AtomicReference<Throwable> exception = new AtomicReference<>();
+ CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
+ FanOutShardSubscription subscription = new
FanOutShardSubscription(waitForSubscriptionLatch);
+
+ SubscribeToShardResponseHandler responseHandler =
SubscribeToShardResponseHandler
+ .builder()
+ .onError(e -> {
+ // Errors that occur while trying to acquire a
subscription are only thrown from here
+ // Errors that occur during the subscription
are surfaced here and to the FanOutShardSubscription
+ // (errors are ignored here once the
subscription is open)
+ if (waitForSubscriptionLatch.getCount() > 0) {
+ exception.set(e);
+ waitForSubscriptionLatch.countDown();
+ }
+ })
+ .subscriber(() -> subscription)
+ .build();
+
+ kinesis.subscribeToShard(request, responseHandler);
+
+ waitForSubscriptionLatch.await();
+
+ Throwable throwable = exception.get();
+ if (throwable != null) {
+ handleError(throwable);
+ }
+
+ LOG.debug("Acquired subscription - {} ({})", shardId,
consumerArn);
+
+ // Request the first record to kick off consumption
+ // Following requests are made by the FanOutShardSubscription
on the netty thread
+ subscription.requestRecord();
+ }
+
+ /**
+ * Update the reference to the latest networking error in this object.
+ * Parent caller can interrogate to decide how to handle error.
+ *
+ * @param throwable the exception that has occurred
+ */
+ private void handleError(final Throwable throwable) throws
FanOutSubscriberException {
+ Throwable cause;
+ if (throwable instanceof CompletionException || throwable
instanceof ExecutionException) {
+ cause = throwable.getCause();
+ } else {
+ cause = throwable;
+ }
+
+ LOG.warn("Error occurred on EFO subscription: {} - ({}). {}
({})",
+ throwable.getClass().getName(), throwable.getMessage(),
shardId, consumerArn, cause);
+
+ throw new FanOutSubscriberException(cause);
+ }
+
+ /**
+ * Once the subscription is open, records will be delivered to the
{@link BlockingQueue}.
+ * Queue capacity is hardcoded to 1 record, the queue is used solely to
separate consumption and processing.
+ * However, this buffer will result in latency reduction as records are
pre-fetched as a result.
+ * This method will poll the queue and exit under any of these
conditions:
+ * - {@code continuationSequenceNumber} is {@code null}, indicating the
shard is complete
+ * - The subscription expires, indicated by a {@link
SubscriptionCompleteEvent}
+ * - There is an error while consuming records, indicated by a {@link
SubscriptionErrorEvent}
+ *
+ * @param eventConsumer the event consumer to deliver records to
+ * @return true if there are no more messages (complete), false if a
subsequent subscription should be obtained
+ * @throws FanOutSubscriberException when an exception is propagated
from the networking stack
+ * @throws InterruptedException
+ */
+ private boolean consumeAllRecordsFromKinesisShard(
+ final Consumer<SubscribeToShardEvent> eventConsumer)
throws InterruptedException, FanOutSubscriberException {
+ String continuationSequenceNumber = null;
+
+ do {
+ // Read timeout will occur after 30 seconds, add a
sanity timeout here to prevent lockup
+ FanOutSubscriptionEvent subscriptionEvent =
queue.poll(DEQUEUE_WAIT_SECONDS, SECONDS);
+
+ if (subscriptionEvent == null) {
+ LOG.debug("Timed out polling events from
network, reacquiring subscription - {} ({})", shardId, consumerArn);
+ return false;
+ } else if (subscriptionEvent.isSubscribeToShardEvent())
{
+ SubscribeToShardEvent event =
subscriptionEvent.getSubscribeToShardEvent();
+ continuationSequenceNumber =
event.continuationSequenceNumber();
+ if (!event.records().isEmpty()) {
+ eventConsumer.accept(event);
+ }
+ } else if (subscriptionEvent.isSubscriptionComplete()) {
+ // The subscription is complete, but the shard
might not be, so we return incomplete
+ return false;
+ } else {
+ handleError(subscriptionEvent.getThrowable());
+ return false;
+ }
+ } while (continuationSequenceNumber != null);
+
+ return true;
+ }
+
+ /**
+ * The {@link FanOutShardSubscription} subscribes to the events coming
from KDS and adds them to the {@link BlockingQueue}.
+ * Backpressure is applied based on the maximum capacity of the queue.
+ * The {@link Subscriber} methods of this class are invoked by a thread
from the {@link KinesisAsyncClient}.
+ */
+ private class FanOutShardSubscription implements
Subscriber<SubscribeToShardEventStream> {
+
+ private Subscription subscription;
+
+ private boolean cancelled = false;
+
+ private final CountDownLatch waitForSubscriptionLatch;
+
+ private final Object lockObject = new Object();
+
+ private FanOutShardSubscription(final CountDownLatch
waitForSubscriptionLatch) {
+ this.waitForSubscriptionLatch =
waitForSubscriptionLatch;
+ }
+
+ /**
+ * Flag to the producer that we are ready to receive more
events.
+ */
+ public void requestRecord() {
+ if (!cancelled) {
+ LOG.debug("Requesting more records from EFO
subscription - {} ({})", shardId, consumerArn);
+ subscription.request(1);
+ }
+ }
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ this.subscription = subscription;
+ waitForSubscriptionLatch.countDown();
+ }
+
+ @Override
+ public void onNext(SubscribeToShardEventStream
subscribeToShardEventStream) {
+ subscribeToShardEventStream.accept(new
SubscribeToShardResponseHandler.Visitor() {
+ @Override
+ public void visit(SubscribeToShardEvent event) {
+ synchronized (lockObject) {
+ if (enqueueEventWithRetry(new
SubscriptionNextEvent(event))) {
+ requestRecord();
+ }
+ }
+ }
+ });
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOG.debug("Error occurred on EFO subscription: {} -
({}). {} ({})",
+ throwable.getClass().getName(),
throwable.getMessage(), shardId, consumerArn);
+
+ // Cancel the subscription to signal the onNext to stop
queuing and requesting data
+ cancelSubscription();
+
+ synchronized (lockObject) {
+ // Empty the queue and add a poison pill to
terminate this subscriber
+ // The synchronized block ensures that new data
is not written in the meantime
+ queue.clear();
+ enqueueEvent(new
SubscriptionErrorEvent(throwable));
Review comment:
In the case of backpressure in Flink's network stack, would this also
mean that job failure is postponed until this `SubscriptionErrorEvent` is
postponed until the source connector thread consumes it?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]