mosche commented on code in PR #23540:
URL: https://github.com/apache/beam/pull/23540#discussion_r992508696
##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardReadersPool.java:
##########
@@ -129,8 +129,43 @@ void startReadingShards(Iterable<ShardRecordsIterator>
shardRecordsIterators, St
getShardIdsFromRecordsIterators(shardRecordsIterators));
for (final ShardRecordsIterator recordsIterator : shardRecordsIterators) {
numberOfRecordsInAQueueByShard.put(recordsIterator.getShardId(), new
AtomicInteger());
- RateLimitPolicy policy =
read.getRateLimitPolicyFactory().getRateLimitPolicy();
- executorService.submit(() -> readLoop(recordsIterator, policy));
+
+ if (recordsIterator.hasConsumer()) {
+ LOG.info(
+ "Subscribing to shard {} via consumer {}",
+ recordsIterator.getShardId(),
+ recordsIterator.getConsumerInfo());
+ executorService.submit(() -> subscribeLoop(recordsIterator));
+ } else {
+ RateLimitPolicy policy =
read.getRateLimitPolicyFactory().getRateLimitPolicy();
+ executorService.submit(() -> readLoop(recordsIterator, policy));
+ }
+ }
+ }
+
+ private void subscribeLoop(ShardRecordsIterator shardRecordsIterator) {
+ while (poolOpened.get()) {
+ try {
+ shardRecordsIterator.subscribeToShard(this::putRecord);
Review Comment:
Somehow we have to get / process `childShards` from the
`SubscribeToShardEvent` to handle any re-sharding event.
Also, shouldn't the next subscribe event use the
`continuationSequenceNumber`?
> Use this as <code>SequenceNumber</code> in the next call to
<a>SubscribeToShard</a>, with
> <code>StartingPosition</code> set to <code>AT_SEQUENCE_NUMBER</code> or
<code>AFTER_SEQUENCE_NUMBER</code>. Use
> <code>ContinuationSequenceNumber</code> for checkpointing because it
captures your shard progress even when no
> data is written to the shard.
Honestly, I'm not sure if that has any implications for checkpointing... I
think the current approach should still be fine.
##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardReadersPool.java:
##########
@@ -129,8 +129,43 @@ void startReadingShards(Iterable<ShardRecordsIterator>
shardRecordsIterators, St
getShardIdsFromRecordsIterators(shardRecordsIterators));
for (final ShardRecordsIterator recordsIterator : shardRecordsIterators) {
numberOfRecordsInAQueueByShard.put(recordsIterator.getShardId(), new
AtomicInteger());
- RateLimitPolicy policy =
read.getRateLimitPolicyFactory().getRateLimitPolicy();
- executorService.submit(() -> readLoop(recordsIterator, policy));
+
+ if (recordsIterator.hasConsumer()) {
+ LOG.info(
+ "Subscribing to shard {} via consumer {}",
+ recordsIterator.getShardId(),
+ recordsIterator.getConsumerInfo());
+ executorService.submit(() -> subscribeLoop(recordsIterator));
Review Comment:
I don't have a good suggestion ready, but wondering if it makes sense to
reuse `ShardRecordsIterator` in case of enhanced-fanout. In any case it would
be good to avoid the kinesis `getShardIterator` request, that one doesn't make
sense when using enhanced-fanout.
##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisSource.java:
##########
@@ -58,17 +63,20 @@ private KinesisSource(Read spec, CheckpointGenerator
initialCheckpoint) {
private SimplifiedKinesisClient createClient(PipelineOptions options) {
AwsOptions awsOptions = options.as(AwsOptions.class);
Supplier<KinesisClient> kinesisSupplier;
+ Supplier<KinesisAsyncClient> kinesisAsyncSupplier;
Supplier<CloudWatchClient> cloudWatchSupplier;
if (spec.getAWSClientsProvider() != null) {
kinesisSupplier = spec.getAWSClientsProvider()::getKinesisClient;
+ kinesisAsyncSupplier =
spec.getAWSClientsProvider()::getKinesisAsyncClient;
cloudWatchSupplier = spec.getAWSClientsProvider()::getCloudWatchClient;
Review Comment:
We can forbid using the deprecated configuration path (AwsClientsProvider)
when enabling enhanced fanout. Similar checks are in
`KinesisIO.Read.expand(...)`
##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/AWSClientsProvider.java:
##########
@@ -32,5 +33,7 @@
public interface AWSClientsProvider extends Serializable {
KinesisClient getKinesisClient();
+ KinesisAsyncClient getKinesisAsyncClient();
Review Comment:
I'd suggest to simply ignore this deprecated configuration path and leave
this as is. Also, it would be a breaking change to add a method without default
impl here.
##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardReadersPool.java:
##########
@@ -129,8 +129,43 @@ void startReadingShards(Iterable<ShardRecordsIterator>
shardRecordsIterators, St
getShardIdsFromRecordsIterators(shardRecordsIterators));
for (final ShardRecordsIterator recordsIterator : shardRecordsIterators) {
numberOfRecordsInAQueueByShard.put(recordsIterator.getShardId(), new
AtomicInteger());
- RateLimitPolicy policy =
read.getRateLimitPolicyFactory().getRateLimitPolicy();
- executorService.submit(() -> readLoop(recordsIterator, policy));
+
+ if (recordsIterator.hasConsumer()) {
+ LOG.info(
+ "Subscribing to shard {} via consumer {}",
+ recordsIterator.getShardId(),
+ recordsIterator.getConsumerInfo());
+ executorService.submit(() -> subscribeLoop(recordsIterator));
Review Comment:
Just and idea, considering that the two approaches are fundamentally
different, it might be worth having separate and clean implementations for
either one:
- The pull based iterator requires a read loop per shard on an executor
service implemented in `ShardReadersPool`.
- This shouldn't be necessary with the callback based push approach. Can we
avoid the executor service (and the entire `ShardReadersPool`) and use a
`ShardSubscribersPool` instead as "entry point" for enhanced fanout?
The implementation effort is going to be a lot higher, of course.
But it should help to keep things easier to understand and avoid exploding
complexity.
##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardCheckpoint.java:
##########
@@ -155,6 +180,23 @@ private boolean checkpointIsInTheMiddleOfAUserRecord() {
return shardIteratorType == AFTER_SEQUENCE_NUMBER && subSequenceNumber !=
null;
}
+ public CompletableFuture<Void> subscribeToShard(
+ boolean resubscribe,
+ SimplifiedKinesisClient kinesisClient,
+ final SubscribeToShardResponseHandler.Visitor visitor,
+ final Consumer<Throwable> onError) {
+ if (resubscribe) {
+ return kinesisClient.subscribeToShard(
Review Comment:
Don't we have to use the `continuationSequenceNumber` to resubscribe? E.g.
if we want to resubscribe to a large stream from the beginning (trim horizon)
we're likely not able to process everything within one timeout interval.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]