mosche commented on code in PR #23540:
URL: https://github.com/apache/beam/pull/23540#discussion_r1163668096


##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscriber.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static 
org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.INITIALIZED;
+import static 
org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.PAUSED;
+import static 
org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.RUNNING;
+import static 
org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.STOPPED;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import io.netty.channel.ChannelException;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+class EFOShardSubscriber {
+  enum State {
+    INITIALIZED, // Initialized, but not started yet
+    RUNNING, // Subscriber started
+    PAUSED, // Subscriber paused due to backpressure
+    STOPPED // Subscriber stopped
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(EFOShardSubscriber.class);
+  private static final Integer IN_FLIGHT_LIMIT = 10;
+
+  private final EFOShardSubscribersPool pool;
+  private final String consumerArn;
+
+  // Shard id of this subscriber
+  private final String shardId;
+
+  private final KinesisAsyncClient kinesis;
+
+  /** Internal subscriber state. */
+  private volatile State state = INITIALIZED;
+
+  /**
+   * Kept only for cases when a subscription starts and then fails with a 
non-critical error, before
+   * any event updates {@link ShardEventsSubscriber#sequenceNumber}.
+   */
+  private @MonotonicNonNull StartingPosition initialPosition;
+
+  /**
+   * Completes once this shard subscriber is done, either normally (stopped or 
shard is completely
+   * consumed) or exceptionally due to a non retry-able error.
+   */
+  private final CompletableFuture<Void> done = new CompletableFuture<>();
+
+  private final ShardEventsSubscriber eventsSubscriber = new 
ShardEventsSubscriber();
+
+  /** Tracks number of delivered events in flight (until ack-ed). */
+  private final AtomicInteger inFlight = new AtomicInteger();
+
+  /**
+   * Async completion handler for {@link KinesisAsyncClient#subscribeToShard} 
that:
+   * <li>exists immediately if {@link #done} is already completed 
(exceptionally),

Review Comment:
   ```suggestion
      * <li>exits immediately if {@link #done} is already completed 
(exceptionally),
   ```



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java:
##########
@@ -147,6 +149,92 @@
  * Read#withCustomRateLimitPolicy(RateLimitPolicyFactory)}. This requires 
implementing {@link
  * RateLimitPolicy} with a corresponding {@link RateLimitPolicyFactory}.
  *
+ * <h4>Enhanced Fan-Out</h4>
+ *
+ * Kinesis IO supports Consumers with Dedicated Throughput (Enhanced Fan-Out, 
EFO). This type of
+ * consumer doesn't have to contend with other consumers that are receiving 
data from the stream.
+ *
+ * <p>More details can be found here: <a
+ * 
href="https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html";>Consumers
 with
+ * Dedicated Throughput</a>
+ *
+ * <p>Primary method of enabling EFO is setting {@link 
Read#withConsumerArn(String)}:
+ *
+ * <pre>{@code
+ * p.apply(KinesisIO.read()
+ *    .withStreamName("streamName")
+ *    .withInitialPositionInStream(InitialPositionInStream.LATEST)
+ *    .withConsumerArn("arn:aws:kinesis:.../streamConsumer:12345678"))
+ * }</pre>
+ *
+ * <p>Alternatively, EFO can be enabled for one or more {@link Read} instances 
via pipeline options:
+ *
+ * <pre>{@code --kinesisIOConsumerArns{
+ *   "stream-01": 
"arn:aws:kinesis:...:stream/stream-01/consumer/consumer-01:1678576714",
+ *   "stream-02": 
"arn:aws:kinesis:...:stream/stream-02/consumer/my-consumer:1679576982",
+ *   ...
+ * }}</pre>
+ *
+ * <p>If set, pipeline options will overwrite {@link 
Read#withConsumerArn(String)} setting. Check
+ * {@link KinesisIOOptions} for more details.
+ *
+ * <p>Depending on the downstream processing performance, the EFO consumer 
will back-pressure
+ * internally.
+ *
+ * <p>Adjusting runner's settings is recommended - such that it does not 
(re)start EFO consumer(s)
+ * faster than once per ~ 10 seconds. Internal calls to {@link
+ * KinesisAsyncClient#subscribeToShard(SubscribeToShardRequest, 
SubscribeToShardResponseHandler)}
+ * may throw ResourceInUseException otherwise, which will cause a crash loop.

Review Comment:
   > ResourceInUseException doesn't happen when we re-subscribe upon 
subscription expiration, only when the client closes the subscription before it 
expires and then re-subscribes faster than 5 sec.
   
   Yes, the case you're describing here should be handled in the 
re-subscription handler. In this case it's the first attempt to subscribe that 
fails. And this should be retried as discussed.



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPool.java:
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.beam.sdk.io.aws2.kinesis.TimeUtil.minTimestamp;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import 
org.apache.beam.repackaged.core.org.apache.commons.lang3.RandomStringUtils;
+import org.apache.beam.sdk.util.Preconditions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ForwardingIterator;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ChildShard;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.retrieval.AggregatorUtil;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+@SuppressWarnings({"nullness"})
+class EFOShardSubscribersPool {
+  private static final Logger LOG = 
LoggerFactory.getLogger(EFOShardSubscribersPool.class);
+  private static final int ON_ERROR_COOL_DOWN_MS_DEFAULT = 1_000;
+  private final int onErrorCoolDownMs;
+
+  /**
+   * Identifier of the current subscribers pool.
+   *
+   * <p>Injected into other objects which belong to this pool to ease tracing 
with logs.
+   */
+  private final String poolId;
+
+  private final KinesisIO.Read read;
+  private final String consumerArn;
+  private final KinesisAsyncClient kinesis;
+
+  /**
+   * Unbounded queue of events, but events in-flight are limited by the {@link 
EFOShardSubscriber}.
+   */
+  private final ConcurrentLinkedQueue<EventRecords> eventQueue = new 
ConcurrentLinkedQueue<>();
+
+  /**
+   * State map of currently active shards that can be checkpoint-ed.
+   *
+   * <p>This map may only be accessed and updated from within {@link #start}, 
{@link #getNextRecord}
+   * and dependent {@link #onEventDone} to prevent race conditions.
+   */
+  private final Map<String, ShardState> state = new HashMap<>();
+
+  /**
+   * Async subscription error (as first seen), if set all subscribers must be 
cancelled and no new
+   * ones started.
+   *
+   * <p>Must be volatile as it is accessed from various threads. But it's best 
effort, setting this
+   * doesn't have to be atomic.
+   */
+  private volatile @MonotonicNonNull Throwable subscriptionError;
+
+  /**
+   * May only ever be altered from within {@link #stop()} or {@link 
#getNextRecord()} to prevent
+   * race conditions when cancelling subscribers.
+   */
+  private boolean isStopped = false;
+
+  /**
+   * Async completion callback handling {@link EFOShardSubscriber#subscribe 
supscriptions} that
+   * terminate exceptionally.
+   *
+   * <p>Unless already in error state, stores error as {@link 
#subscriptionError}. This pool will be
+   * stopped when {@link #getNextRecord()} is called next, but allowing the 
{@link #eventQueue} to
+   * be drained. Only once empty any {@link #subscriptionError} is propagated. 
This simplifies state
+   * management and checkpointing a lot.
+   */
+  private final BiConsumer<Void, Throwable> errorHandler =
+      (Void unused, Throwable error) -> {
+        if (error != null && subscriptionError == null) {
+          subscriptionError = error;
+        }
+      };
+
+  private final ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+
+  // EventRecords iterator that is currently consumed
+  @Nullable EventRecords current = null;
+
+  private final WatermarkPolicyFactory watermarkPolicyFactory;
+
+  EFOShardSubscribersPool(KinesisIO.Read readSpec, String consumerArn, 
KinesisAsyncClient kinesis) {
+    this.poolId = generatePoolId();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicyFactory = read.getWatermarkPolicyFactory();
+    this.onErrorCoolDownMs = ON_ERROR_COOL_DOWN_MS_DEFAULT;
+  }
+
+  EFOShardSubscribersPool(
+      KinesisIO.Read readSpec,
+      String consumerArn,
+      KinesisAsyncClient kinesis,
+      int onErrorCoolDownMs) {
+    this.poolId = generatePoolId();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicyFactory = read.getWatermarkPolicyFactory();
+    this.onErrorCoolDownMs = onErrorCoolDownMs;
+  }
+
+  /**
+   * Starts a subscribers pool by starting a {@link 
EFOShardSubscriber#subscribe shard subscription}
+   * for each {@link ShardCheckpoint} with the subscription {@link 
#errorHandler} callback.
+   *
+   * <p>{@link EFOShardSubscriber}s with their respective state are tracked in 
{@link #state}.
+   */
+  void start(Iterable<ShardCheckpoint> checkpoints) {
+    LOG.info(
+        "Pool {} - starting for stream {} consumer {}. Checkpoints = {}",
+        poolId,
+        read.getStreamName(),
+        consumerArn,
+        checkpoints);
+    for (ShardCheckpoint shardCheckpoint : checkpoints) {
+      checkState(
+          !state.containsKey(shardCheckpoint.getShardId()),
+          "Duplicate shard id %s",
+          shardCheckpoint.getShardId());
+      ShardState shardState =
+          new ShardState(
+              initShardSubscriber(shardCheckpoint), shardCheckpoint, 
watermarkPolicyFactory);
+      state.put(shardCheckpoint.getShardId(), shardState);
+    }
+  }
+
+  /**
+   * Returns the next disaggregated {@link KinesisRecord} if available and 
updates {@link #state}
+   * accordingly so that it reflects a mutable checkpoint AFTER returning that 
record.
+   *
+   * <p>Async subscription errors are delayed until {@link #eventQueue} is 
completely drained and
+   * then rethrown here.
+   *
+   * <p>This repeats the following steps until a record or {@code null} was 
returned:
+   *
+   * <ol>
+   *   <li>If {@link #current} is null and {@link #eventQueue} is empty, 
return {@code null} unless
+   *       {@link #subscriptionError} is set: in that case rethrow.
+   *   <li>Otherwise if {@link #current} is null, poll next from {@link 
#eventQueue}.
+   *   <li>If {@link #current} has a next {@link KinesisClientRecord}, update 
{@link #state}
+   *       accordingly and return the corresponding converted {@link 
KinesisRecord}, optionally
+   *       triggering {@link #onEventDone} if that was the last record of 
{@link #current}.
+   *   <li>Finally, if nothing was returned yet, trigger {@link #onEventDone} 
and continue loop.
+   * </ol>
+   *
+   * <p>It polls the {@link #eventQueue} in a while loop to avoid returning 
null immediately if an
+   * event without records arrived. There may be events with records after the 
{@link #current}, and
+   * it is better to poll again instead of having {@link 
EFOKinesisReader#advance()} signalling
+   * false to Beam. Otherwise, Beam would poll again later, which would 
introduce unnecessary delay.
+   */
+  @Nullable
+  KinesisRecord getNextRecord() throws IOException {
+    while (true) {
+      if (!isStopped && subscriptionError != null) {
+        // Stop the pool to cancel all subscribers and prevent new 
subscriptions.
+        // Doing this as part of getNextRecord() avoids concurrent access to 
the state map and
+        // prevents any related issues.
+        stop();
+      }
+
+      if (current == null) {
+        current = eventQueue.poll();
+      }
+
+      if (current != null) {
+        String shardId = current.shardId;
+        ShardState shardState = 
Preconditions.checkStateNotNull(state.get(shardId));
+        if (current.hasNext()) {
+          KinesisClientRecord r = current.next();
+          KinesisRecord kinesisRecord = new KinesisRecord(r, 
read.getStreamName(), shardId);

Review Comment:
   > I think, at this point, this is the only doubt I actually have. Other 
things are already addressed, right @mosche ?
   
   @psolomin yes, this is the only open issue I'm aware of. Before returning, 
`shardState` must be adequately updated if this was the last record of 
`current` to make sure checkpoint marks are correctly reflecting progress.
   
   ```
             KinesisClientRecord r = current.next();
             // Make sure to update shard state accordingly if `current` does 
not contain any more
             // events. This is necessary to account for any re-sharding, so we 
could correctly resume
             // from a checkpoint if taken once we advanced to the record 
returned by getNextRecord().
             if (!current.hasNext()) {
               onEventDone(shardState, current);
               current = null;
             }
             KinesisRecord kinesisRecord = new KinesisRecord(r, 
read.getStreamName(), shardId);
             if (shardState.isAfterInitialCheckpoint(kinesisRecord)) {
               shardState.update(kinesisRecord);
               return kinesisRecord;
             }
   ```
   
   Looks like we're missing a test case for this as follows:
   - stub `subscribeToShard` with a reshard event that **also** contains 1 
record.
   - wait for exactly that 1 record
   - verify the checkpoint mark now contains the new shards, but not the old 
one anymore
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to