dannycranmer commented on a change in pull request #12881:
URL: https://github.com/apache/flink/pull/12881#discussion_r454856500



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import 
org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.function.Consumer;
+
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+
+/**
+ * A {@link RecordPublisher} that will read records from Kinesis and forward 
them to the subscriber.
+ * Records are consumed by polling the GetRecords KDS API using a 
ShardIterator.
+ */
+@Internal
+public class PollingRecordPublisher implements RecordPublisher {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PollingRecordPublisher.class);
+
+       private final PollingRecordPublisherMetricsReporter metricsReporter;
+
+       private final KinesisProxyInterface kinesisProxy;
+
+       private final StreamShardHandle subscribedShard;
+
+       private String nextShardItr;
+
+       private final int maxNumberOfRecordsPerFetch;
+
+       private final long expiredIteratorBackoffMillis;
+
+       /**
+        * A Polling implementation of {@link RecordPublisher} that polls 
kinesis for records.
+        * The following KDS services are used: GetRecords and GetShardIterator.
+        *
+        * @param subscribedShard the shard in which to consume from
+        * @param metricsReporter a metric reporter used to output metrics
+        * @param kinesisProxy the proxy used to communicate with kinesis
+        * @param maxNumberOfRecordsPerFetch the maximum number of records to 
retrieve per batch
+        * @param expiredIteratorBackoffMillis the duration to sleep in the 
event of an {@link ExpiredIteratorException}
+        */
+       public PollingRecordPublisher(
+                       final StreamShardHandle subscribedShard,
+                       final PollingRecordPublisherMetricsReporter 
metricsReporter,
+                       final KinesisProxyInterface kinesisProxy,
+                       final int maxNumberOfRecordsPerFetch,
+                       final long expiredIteratorBackoffMillis) {
+               this.subscribedShard = subscribedShard;
+               this.metricsReporter = metricsReporter;
+               this.kinesisProxy = kinesisProxy;
+               this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch;
+               this.expiredIteratorBackoffMillis = 
expiredIteratorBackoffMillis;
+       }
+
+       @Override
+       public RecordPublisherRunResult run(final StartingPosition 
startingPosition, final Consumer<RecordBatch> consumer) throws 
InterruptedException {
+               return run(startingPosition, consumer, 
maxNumberOfRecordsPerFetch);
+       }
+
+       public RecordPublisherRunResult run(final StartingPosition 
startingPosition, final Consumer<RecordBatch> consumer, int maxNumberOfRecords) 
throws InterruptedException {
+               if (nextShardItr == null) {
+                       nextShardItr = getShardIterator(startingPosition);
+               }
+
+               if (nextShardItr == null) {
+                       return COMPLETE;
+               }
+
+               
metricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecords);
+
+               GetRecordsResult result = getRecords(nextShardItr, 
startingPosition, maxNumberOfRecords);
+
+               consumer.accept(new RecordBatch(result.getRecords(), 
subscribedShard, result.getMillisBehindLatest()));
+
+               nextShardItr = result.getNextShardIterator();
+               return nextShardItr == null ? COMPLETE : INCOMPLETE;
+       }
+
+       /**
+        * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while 
also handling unexpected
+        * AWS {@link ExpiredIteratorException}s to assure that we get results 
and don't just fail on
+        * such occasions. The returned shard iterator within the successful 
{@link GetRecordsResult} should
+        * be used for the next call to this method.
+        *
+        * <p>Note: it is important that this method is not called again before 
all the records from the last result have been
+        * fully collected with {@code 
ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, 
otherwise
+        * {@code ShardConsumer#lastSequenceNum} may refer to a sub-record in 
the middle of an aggregated record, leading to
+        * incorrect shard iteration if the iterator had to be refreshed.
+        *
+        * @param shardItr shard iterator to use
+        * @param startingPosition the position in the stream in which to 
consume from
+        * @param maxNumberOfRecords the maximum number of records to fetch for 
this getRecords attempt
+        * @return get records result
+        */
+       private GetRecordsResult getRecords(String shardItr, final 
StartingPosition startingPosition, int maxNumberOfRecords) throws 
InterruptedException {
+               GetRecordsResult getRecordsResult = null;
+               while (getRecordsResult == null) {
+                       try {
+                               getRecordsResult = 
kinesisProxy.getRecords(shardItr, maxNumberOfRecords);
+                       } catch (ExpiredIteratorException | 
InterruptedException eiEx) {

Review comment:
       Ah that `InterruptedException` should not be there! I will delete that
   
   The other exceptions are handled in the proxy: 
https://github.com/apache/flink/blob/c5915cf87f96e1c7ebd84ad00f7eabade7e7fe37/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L248




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to