psolomin commented on code in PR #23540:
URL: https://github.com/apache/beam/pull/23540#discussion_r1160713936


##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscriber.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static 
org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.INITIALIZED;
+import static 
org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.PAUSED;
+import static 
org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.RUNNING;
+import static 
org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.STOPPED;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import io.netty.channel.ChannelException;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+class EFOShardSubscriber {
+  enum State {
+    INITIALIZED, // Initialized, but not started yet
+    RUNNING, // Subscriber started
+    PAUSED, // Subscriber paused due to backpressure
+    STOPPED // Subscriber stopped
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(EFOShardSubscriber.class);
+  private static final Integer IN_FLIGHT_LIMIT = 10;
+
+  private final EFOShardSubscribersPool pool;
+  private final String consumerArn;
+
+  // Shard id of this subscriber
+  private final String shardId;
+
+  private final KinesisAsyncClient kinesis;
+
+  /** Internal subscriber state. */
+  private volatile State state = INITIALIZED;
+
+  /**
+   * Kept only for cases when a subscription starts and then fails with a 
non-critical error, before
+   * any event updates {@link ShardEventsSubscriber#sequenceNumber}.
+   */
+  private @MonotonicNonNull StartingPosition initialPosition;
+
+  /**
+   * Completes once this shard subscriber is done, either normally (stopped or 
shard is completely
+   * consumed) or exceptionally due to a non retry-able error.
+   */
+  private final CompletableFuture<Void> done = new CompletableFuture<>();
+
+  private final ShardEventsSubscriber eventsSubscriber = new 
ShardEventsSubscriber();
+
+  /** Tracks number of delivered events in flight (until ack-ed). */
+  private final AtomicInteger inFlight = new AtomicInteger();
+
+  /**
+   * Async completion handler for {@link KinesisAsyncClient#subscribeToShard} 
that:
+   * <li>exists immediately if {@link #done} is already completed 
(exceptionally),
+   * <li>re-subscribes at {@link ShardEventsSubscriber#sequenceNumber} for 
retryable errors such as
+   *     retryable {@link SdkException}, {@link ClosedChannelException}, 
{@link ChannelException},
+   *     {@link TimeoutException} (any of these might be wrapped in {@link 
CompletionException}s)
+   * <li>or completes {@link #done} exceptionally for any other error,
+   * <li>completes {@link #done} normally if subscriber {@link #state} is 
{@link State#STOPPED} or
+   *     if shard completed (no further {@link 
ShardEventsSubscriber#sequenceNumber}),
+   * <li>or otherwise re-subscribes at {@link 
ShardEventsSubscriber#sequenceNumber}.
+   */
+  private final BiConsumer<Void, Throwable> reSubscriptionHandler;
+
+  private static boolean isRetryable(Throwable error) {
+    Throwable cause = unwrapCompletionException(error);
+    if (cause instanceof SdkException && ((SdkException) cause).retryable()) {
+      return true; // retryable SDK exception
+    }
+    // check the root cause for issues that can be addressed using retries
+    cause = Throwables.getRootCause(cause);
+    return cause instanceof ClosedChannelException // Java Nio
+        || cause instanceof TimeoutException // Java
+        || cause instanceof ChannelException; // Netty (e.g. 
ReadTimeoutException)
+  }
+
+  /** Loops through completion exceptions until we get the underlying cause. */
+  private static Throwable unwrapCompletionException(Throwable 
completionException) {
+    Throwable current = completionException;
+    while (current instanceof CompletionException) {
+      Throwable cause = current.getCause();
+      if (cause != null) {
+        current = cause;
+      } else {
+        return current;
+      }
+    }
+    return current;
+  }
+
+  @SuppressWarnings({"FutureReturnValueIgnored", "all"})

Review Comment:
   > issues
   
   `reSubscriptionHandler` assignment was using some state which is only 
finalized once `subscribe()` is called. But `reSubscriptionHandler` assignment 
itself does not run any code, so, I've left the more specific `SuppressWarnings 
` here and did not change the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to