psolomin commented on code in PR #23540: URL: https://github.com/apache/beam/pull/23540#discussion_r1163843884
########## sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPool.java: ########## @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.kinesis; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.beam.sdk.io.aws2.kinesis.TimeUtil.minTimestamp; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.BiConsumer; +import java.util.function.Supplier; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.RandomStringUtils; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ForwardingIterator; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ChildShard; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.awssdk.services.kinesis.model.StartingPosition; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.retrieval.AggregatorUtil; +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +@SuppressWarnings({"nullness"}) +class EFOShardSubscribersPool { + private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscribersPool.class); + private static final int ON_ERROR_COOL_DOWN_MS_DEFAULT = 1_000; + private final int onErrorCoolDownMs; + + /** + * Identifier of the current subscribers pool. + * + * <p>Injected into other objects which belong to this pool to ease tracing with logs. + */ + private final String poolId; + + private final KinesisIO.Read read; + private final String consumerArn; + private final KinesisAsyncClient kinesis; + + /** + * Unbounded queue of events, but events in-flight are limited by the {@link EFOShardSubscriber}. + */ + private final ConcurrentLinkedQueue<EventRecords> eventQueue = new ConcurrentLinkedQueue<>(); + + /** + * State map of currently active shards that can be checkpoint-ed. + * + * <p>This map may only be accessed and updated from within {@link #start}, {@link #getNextRecord} + * and dependent {@link #onEventDone} to prevent race conditions. + */ + private final Map<String, ShardState> state = new HashMap<>(); + + /** + * Async subscription error (as first seen), if set all subscribers must be cancelled and no new + * ones started. + * + * <p>Must be volatile as it is accessed from various threads. But it's best effort, setting this + * doesn't have to be atomic. + */ + private volatile @MonotonicNonNull Throwable subscriptionError; + + /** + * May only ever be altered from within {@link #stop()} or {@link #getNextRecord()} to prevent + * race conditions when cancelling subscribers. + */ + private boolean isStopped = false; + + /** + * Async completion callback handling {@link EFOShardSubscriber#subscribe supscriptions} that + * terminate exceptionally. + * + * <p>Unless already in error state, stores error as {@link #subscriptionError}. This pool will be + * stopped when {@link #getNextRecord()} is called next, but allowing the {@link #eventQueue} to + * be drained. Only once empty any {@link #subscriptionError} is propagated. This simplifies state + * management and checkpointing a lot. + */ + private final BiConsumer<Void, Throwable> errorHandler = + (Void unused, Throwable error) -> { + if (error != null && subscriptionError == null) { + subscriptionError = error; + } + }; + + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + // EventRecords iterator that is currently consumed + @Nullable EventRecords current = null; + + private final WatermarkPolicyFactory watermarkPolicyFactory; + + EFOShardSubscribersPool(KinesisIO.Read readSpec, String consumerArn, KinesisAsyncClient kinesis) { + this.poolId = generatePoolId(); + this.read = readSpec; + this.consumerArn = consumerArn; + this.kinesis = kinesis; + this.watermarkPolicyFactory = read.getWatermarkPolicyFactory(); + this.onErrorCoolDownMs = ON_ERROR_COOL_DOWN_MS_DEFAULT; + } + + EFOShardSubscribersPool( + KinesisIO.Read readSpec, + String consumerArn, + KinesisAsyncClient kinesis, + int onErrorCoolDownMs) { + this.poolId = generatePoolId(); + this.read = readSpec; + this.consumerArn = consumerArn; + this.kinesis = kinesis; + this.watermarkPolicyFactory = read.getWatermarkPolicyFactory(); + this.onErrorCoolDownMs = onErrorCoolDownMs; + } + + /** + * Starts a subscribers pool by starting a {@link EFOShardSubscriber#subscribe shard subscription} + * for each {@link ShardCheckpoint} with the subscription {@link #errorHandler} callback. + * + * <p>{@link EFOShardSubscriber}s with their respective state are tracked in {@link #state}. + */ + void start(Iterable<ShardCheckpoint> checkpoints) { + LOG.info( + "Pool {} - starting for stream {} consumer {}. Checkpoints = {}", + poolId, + read.getStreamName(), + consumerArn, + checkpoints); + for (ShardCheckpoint shardCheckpoint : checkpoints) { + checkState( + !state.containsKey(shardCheckpoint.getShardId()), + "Duplicate shard id %s", + shardCheckpoint.getShardId()); + ShardState shardState = + new ShardState( + initShardSubscriber(shardCheckpoint), shardCheckpoint, watermarkPolicyFactory); + state.put(shardCheckpoint.getShardId(), shardState); + } + } + + /** + * Returns the next disaggregated {@link KinesisRecord} if available and updates {@link #state} + * accordingly so that it reflects a mutable checkpoint AFTER returning that record. + * + * <p>Async subscription errors are delayed until {@link #eventQueue} is completely drained and + * then rethrown here. + * + * <p>This repeats the following steps until a record or {@code null} was returned: + * + * <ol> + * <li>If {@link #current} is null and {@link #eventQueue} is empty, return {@code null} unless + * {@link #subscriptionError} is set: in that case rethrow. + * <li>Otherwise if {@link #current} is null, poll next from {@link #eventQueue}. + * <li>If {@link #current} has a next {@link KinesisClientRecord}, update {@link #state} + * accordingly and return the corresponding converted {@link KinesisRecord}, optionally + * triggering {@link #onEventDone} if that was the last record of {@link #current}. + * <li>Finally, if nothing was returned yet, trigger {@link #onEventDone} and continue loop. + * </ol> + * + * <p>It polls the {@link #eventQueue} in a while loop to avoid returning null immediately if an + * event without records arrived. There may be events with records after the {@link #current}, and + * it is better to poll again instead of having {@link EFOKinesisReader#advance()} signalling + * false to Beam. Otherwise, Beam would poll again later, which would introduce unnecessary delay. + */ + @Nullable + KinesisRecord getNextRecord() throws IOException { + while (true) { + if (!isStopped && subscriptionError != null) { + // Stop the pool to cancel all subscribers and prevent new subscriptions. + // Doing this as part of getNextRecord() avoids concurrent access to the state map and + // prevents any related issues. + stop(); + } + + if (current == null) { + current = eventQueue.poll(); + } + + if (current != null) { + String shardId = current.shardId; + ShardState shardState = Preconditions.checkStateNotNull(state.get(shardId)); + if (current.hasNext()) { + KinesisClientRecord r = current.next(); + KinesisRecord kinesisRecord = new KinesisRecord(r, read.getStreamName(), shardId); Review Comment: > reshard event that also contains 1 record I am not sure such a thing exists. I was convinced that re-shard events always come with `continuationSequenceNumber = null`, and they never carry records with non-null sequence numbers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
