dannycranmer commented on a change in pull request #12881: URL: https://github.com/apache/flink/pull/12881#discussion_r454856500
########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch; +import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher; +import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter; +import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition; +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; + +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.function.Consumer; + +import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST; +import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE; +import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE; + +/** + * A {@link RecordPublisher} that will read records from Kinesis and forward them to the subscriber. + * Records are consumed by polling the GetRecords KDS API using a ShardIterator. + */ +@Internal +public class PollingRecordPublisher implements RecordPublisher { + + private static final Logger LOG = LoggerFactory.getLogger(PollingRecordPublisher.class); + + private final PollingRecordPublisherMetricsReporter metricsReporter; + + private final KinesisProxyInterface kinesisProxy; + + private final StreamShardHandle subscribedShard; + + private String nextShardItr; + + private final int maxNumberOfRecordsPerFetch; + + private final long expiredIteratorBackoffMillis; + + /** + * A Polling implementation of {@link RecordPublisher} that polls kinesis for records. + * The following KDS services are used: GetRecords and GetShardIterator. + * + * @param subscribedShard the shard in which to consume from + * @param metricsReporter a metric reporter used to output metrics + * @param kinesisProxy the proxy used to communicate with kinesis + * @param maxNumberOfRecordsPerFetch the maximum number of records to retrieve per batch + * @param expiredIteratorBackoffMillis the duration to sleep in the event of an {@link ExpiredIteratorException} + */ + public PollingRecordPublisher( + final StreamShardHandle subscribedShard, + final PollingRecordPublisherMetricsReporter metricsReporter, + final KinesisProxyInterface kinesisProxy, + final int maxNumberOfRecordsPerFetch, + final long expiredIteratorBackoffMillis) { + this.subscribedShard = subscribedShard; + this.metricsReporter = metricsReporter; + this.kinesisProxy = kinesisProxy; + this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch; + this.expiredIteratorBackoffMillis = expiredIteratorBackoffMillis; + } + + @Override + public RecordPublisherRunResult run(final StartingPosition startingPosition, final Consumer<RecordBatch> consumer) throws InterruptedException { + return run(startingPosition, consumer, maxNumberOfRecordsPerFetch); + } + + public RecordPublisherRunResult run(final StartingPosition startingPosition, final Consumer<RecordBatch> consumer, int maxNumberOfRecords) throws InterruptedException { + if (nextShardItr == null) { + nextShardItr = getShardIterator(startingPosition); + } + + if (nextShardItr == null) { + return COMPLETE; + } + + metricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecords); + + GetRecordsResult result = getRecords(nextShardItr, startingPosition, maxNumberOfRecords); + + consumer.accept(new RecordBatch(result.getRecords(), subscribedShard, result.getMillisBehindLatest())); + + nextShardItr = result.getNextShardIterator(); + return nextShardItr == null ? COMPLETE : INCOMPLETE; + } + + /** + * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected + * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on + * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should + * be used for the next call to this method. + * + * <p>Note: it is important that this method is not called again before all the records from the last result have been + * fully collected with {@code ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise + * {@code ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to + * incorrect shard iteration if the iterator had to be refreshed. + * + * @param shardItr shard iterator to use + * @param startingPosition the position in the stream in which to consume from + * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt + * @return get records result + */ + private GetRecordsResult getRecords(String shardItr, final StartingPosition startingPosition, int maxNumberOfRecords) throws InterruptedException { + GetRecordsResult getRecordsResult = null; + while (getRecordsResult == null) { + try { + getRecordsResult = kinesisProxy.getRecords(shardItr, maxNumberOfRecords); + } catch (ExpiredIteratorException | InterruptedException eiEx) { Review comment: Ah that `InterruptedException` should not be there! I will delete that The other exceptions are handled in the proxy: https://github.com/apache/flink/blob/c5915cf87f96e1c7ebd84ad00f7eabade7e7fe37/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L248 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
