tzulitai commented on a change in pull request #12881:
URL: https://github.com/apache/flink/pull/12881#discussion_r468417898



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import 
org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.function.Consumer;
+
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+
+/**
+ * A {@link RecordPublisher} that will read records from Kinesis and forward 
them to the subscriber.
+ * Records are consumed by polling the GetRecords KDS API using a 
ShardIterator.
+ */
+@Internal
+public class PollingRecordPublisher implements RecordPublisher {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PollingRecordPublisher.class);
+
+       private final PollingRecordPublisherMetricsReporter metricsReporter;
+
+       private final KinesisProxyInterface kinesisProxy;
+
+       private final StreamShardHandle subscribedShard;
+
+       private String nextShardItr;
+
+       private final int maxNumberOfRecordsPerFetch;
+
+       private final long expiredIteratorBackoffMillis;
+
+       /**
+        * A Polling implementation of {@link RecordPublisher} that polls 
kinesis for records.
+        * The following KDS services are used: GetRecords and GetShardIterator.
+        *
+        * @param subscribedShard the shard in which to consume from
+        * @param metricsReporter a metric reporter used to output metrics
+        * @param kinesisProxy the proxy used to communicate with kinesis
+        * @param maxNumberOfRecordsPerFetch the maximum number of records to 
retrieve per batch
+        * @param expiredIteratorBackoffMillis the duration to sleep in the 
event of an {@link ExpiredIteratorException}
+        */
+       public PollingRecordPublisher(

Review comment:
       constructor can be package-private

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import 
org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.function.Consumer;
+
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+
+/**
+ * A {@link RecordPublisher} that will read records from Kinesis and forward 
them to the subscriber.
+ * Records are consumed by polling the GetRecords KDS API using a 
ShardIterator.
+ */
+@Internal
+public class PollingRecordPublisher implements RecordPublisher {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PollingRecordPublisher.class);
+
+       private final PollingRecordPublisherMetricsReporter metricsReporter;
+
+       private final KinesisProxyInterface kinesisProxy;
+
+       private final StreamShardHandle subscribedShard;
+
+       private String nextShardItr;
+
+       private final int maxNumberOfRecordsPerFetch;
+
+       private final long expiredIteratorBackoffMillis;
+
+       /**
+        * A Polling implementation of {@link RecordPublisher} that polls 
kinesis for records.
+        * The following KDS services are used: GetRecords and GetShardIterator.
+        *
+        * @param subscribedShard the shard in which to consume from
+        * @param metricsReporter a metric reporter used to output metrics
+        * @param kinesisProxy the proxy used to communicate with kinesis
+        * @param maxNumberOfRecordsPerFetch the maximum number of records to 
retrieve per batch
+        * @param expiredIteratorBackoffMillis the duration to sleep in the 
event of an {@link ExpiredIteratorException}
+        */
+       public PollingRecordPublisher(
+                       final StreamShardHandle subscribedShard,
+                       final PollingRecordPublisherMetricsReporter 
metricsReporter,
+                       final KinesisProxyInterface kinesisProxy,
+                       final int maxNumberOfRecordsPerFetch,
+                       final long expiredIteratorBackoffMillis) {
+               this.subscribedShard = subscribedShard;
+               this.metricsReporter = metricsReporter;
+               this.kinesisProxy = kinesisProxy;
+               this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch;
+               this.expiredIteratorBackoffMillis = 
expiredIteratorBackoffMillis;
+       }
+
+       @Override
+       public RecordPublisherRunResult run(final StartingPosition 
startingPosition, final Consumer<RecordBatch> consumer) throws 
InterruptedException {
+               return run(startingPosition, consumer, 
maxNumberOfRecordsPerFetch);
+       }
+
+       public RecordPublisherRunResult run(final StartingPosition 
startingPosition, final Consumer<RecordBatch> consumer, int maxNumberOfRecords) 
throws InterruptedException {
+               if (nextShardItr == null) {
+                       nextShardItr = getShardIterator(startingPosition);
+               }
+
+               if (nextShardItr == null) {
+                       return COMPLETE;
+               }
+
+               
metricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecords);
+
+               GetRecordsResult result = getRecords(nextShardItr, 
startingPosition, maxNumberOfRecords);
+
+               consumer.accept(new RecordBatch(result.getRecords(), 
subscribedShard, result.getMillisBehindLatest()));
+
+               nextShardItr = result.getNextShardIterator();
+               return nextShardItr == null ? COMPLETE : INCOMPLETE;
+       }
+
+       /**
+        * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while 
also handling unexpected
+        * AWS {@link ExpiredIteratorException}s to assure that we get results 
and don't just fail on
+        * such occasions. The returned shard iterator within the successful 
{@link GetRecordsResult} should
+        * be used for the next call to this method.
+        *
+        * <p>Note: it is important that this method is not called again before 
all the records from the last result have been
+        * fully collected with {@code 
ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, 
otherwise
+        * {@code ShardConsumer#lastSequenceNum} may refer to a sub-record in 
the middle of an aggregated record, leading to
+        * incorrect shard iteration if the iterator had to be refreshed.
+        *
+        * @param shardItr shard iterator to use
+        * @param startingPosition the position in the stream in which to 
consume from
+        * @param maxNumberOfRecords the maximum number of records to fetch for 
this getRecords attempt
+        * @return get records result
+        */
+       private GetRecordsResult getRecords(String shardItr, final 
StartingPosition startingPosition, int maxNumberOfRecords) throws 
InterruptedException {
+               GetRecordsResult getRecordsResult = null;
+               while (getRecordsResult == null) {
+                       try {
+                               getRecordsResult = 
kinesisProxy.getRecords(shardItr, maxNumberOfRecords);
+                       } catch (ExpiredIteratorException eiEx) {
+                               LOG.warn("Encountered an unexpected expired 
iterator {} for shard {};" +
+                                       " refreshing the iterator ...", 
shardItr, subscribedShard);
+
+                               shardItr = getShardIterator(startingPosition);
+
+                               // sleep for the fetch interval before the next 
getRecords attempt with the refreshed iterator
+                               if (expiredIteratorBackoffMillis != 0) {
+                                       
Thread.sleep(expiredIteratorBackoffMillis);
+                               }
+                       }
+               }
+               return getRecordsResult;
+       }
+
+       /**
+        * Returns a shard iterator for the given {@link SequenceNumber}.
+        *
+        * @return shard iterator
+        */
+       @Nullable
+       private String getShardIterator(final StartingPosition 
startingPosition) throws InterruptedException {
+               if (startingPosition.getShardIteratorType() == LATEST && 
subscribedShard.isClosed()) {
+                       return null;
+               }
+
+               return kinesisProxy.getShardIterator(

Review comment:
       Double checking for my understanding:
   Does this return `null` for an out-of-bounds position?
   Say, the given timestamp is out of bounds or the sequence number is 
non-existent.

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import 
org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.function.Consumer;
+
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+
+/**
+ * A {@link RecordPublisher} that will read records from Kinesis and forward 
them to the subscriber.
+ * Records are consumed by polling the GetRecords KDS API using a 
ShardIterator.
+ */
+@Internal
+public class PollingRecordPublisher implements RecordPublisher {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PollingRecordPublisher.class);
+
+       private final PollingRecordPublisherMetricsReporter metricsReporter;
+
+       private final KinesisProxyInterface kinesisProxy;
+
+       private final StreamShardHandle subscribedShard;
+
+       private String nextShardItr;
+
+       private final int maxNumberOfRecordsPerFetch;
+
+       private final long expiredIteratorBackoffMillis;
+
+       /**
+        * A Polling implementation of {@link RecordPublisher} that polls 
kinesis for records.
+        * The following KDS services are used: GetRecords and GetShardIterator.
+        *
+        * @param subscribedShard the shard in which to consume from
+        * @param metricsReporter a metric reporter used to output metrics
+        * @param kinesisProxy the proxy used to communicate with kinesis
+        * @param maxNumberOfRecordsPerFetch the maximum number of records to 
retrieve per batch
+        * @param expiredIteratorBackoffMillis the duration to sleep in the 
event of an {@link ExpiredIteratorException}
+        */
+       public PollingRecordPublisher(
+                       final StreamShardHandle subscribedShard,
+                       final PollingRecordPublisherMetricsReporter 
metricsReporter,
+                       final KinesisProxyInterface kinesisProxy,
+                       final int maxNumberOfRecordsPerFetch,
+                       final long expiredIteratorBackoffMillis) {
+               this.subscribedShard = subscribedShard;

Review comment:
       Could you add simple sanity checks on these arguments, such as 
`Preconditions.checkNonNull`?

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -53,283 +49,111 @@
  */
 @Internal
 public class ShardConsumer<T> implements Runnable {

Review comment:
       With the record polling refactored out, could you rephrase the 
class-level Javadoc of this class?
   Essentially this is now a thread that just subscribes to a `RecordPublisher` 
which publishes records from a single Kinesis shard.

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -379,49 +206,18 @@ private void 
deserializeRecordForCollectionAndUpdateState(UserRecord record)
        }
 
        /**
-        * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while 
also handling unexpected
-        * AWS {@link ExpiredIteratorException}s to assure that we get results 
and don't just fail on
-        * such occasions. The returned shard iterator within the successful 
{@link GetRecordsResult} should
-        * be used for the next call to this method.
+        * Filters out aggregated records that have previously been processed.
+        * This method is to support restarting from a partially consumed 
aggregated sequence number.
         *
-        * <p>Note: it is important that this method is not called again before 
all the records from the last result have been
-        * fully collected with {@link 
ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, 
otherwise
-        * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in 
the middle of an aggregated record, leading to
-        * incorrect shard iteration if the iterator had to be refreshed.
-        *
-        * @param shardItr shard iterator to use
-        * @param maxNumberOfRecords the maximum number of records to fetch for 
this getRecords attempt
-        * @return get records result
-        * @throws InterruptedException
+        * @param record the record to filter
+        * @return {@code true} if the record should be retained
         */
-       private GetRecordsResult getRecords(String shardItr, int 
maxNumberOfRecords) throws Exception {
-               GetRecordsResult getRecordsResult = null;
-               while (getRecordsResult == null) {
-                       try {
-                               getRecordsResult = kinesis.getRecords(shardItr, 
maxNumberOfRecords);
-
-                               // Update millis behind latest so it gets 
reported by the millisBehindLatest gauge
-                               Long millisBehindLatest = 
getRecordsResult.getMillisBehindLatest();
-                               if (millisBehindLatest != null) {
-                                       
shardMetricsReporter.setMillisBehindLatest(millisBehindLatest);
-                               }
-                       } catch (ExpiredIteratorException eiEx) {
-                               LOG.warn("Encountered an unexpected expired 
iterator {} for shard {};" +
-                                       " refreshing the iterator ...", 
shardItr, subscribedShard);
-
-                               shardItr = getShardIterator(lastSequenceNum);
-
-                               // sleep for the fetch interval before the next 
getRecords attempt with the refreshed iterator
-                               if (fetchIntervalMillis != 0) {
-                                       Thread.sleep(fetchIntervalMillis);
-                               }
-                       }
+       private boolean filterDeaggregatedRecord(final UserRecord record) {
+               if (lastSequenceNum.isAggregated()) {
+                       return 
!record.getSequenceNumber().equals(lastSequenceNum.getSequenceNumber()) ||
+                               record.getSubSequenceNumber() > 
lastSequenceNum.getSubSequenceNumber();
                }
-               return getRecordsResult;
-       }
 
-       @SuppressWarnings("unchecked")
-       protected static List<UserRecord> deaggregateRecords(List<Record> 
records, String startingHashKey, String endingHashKey) {
-               return UserRecord.deaggregate(records, new 
BigInteger(startingHashKey), new BigInteger(endingHashKey));
+               return true;

Review comment:
       Just for easier code readability, can we rewrite this as:
   
   ```
   if (!lastSequenceNum.isAggregated()) {
       return true;
   }
   
   return 
!record.getSequenceNumber().equals(lastSequenceNum.getSequenceNumber()) ||
                                record.getSubSequenceNumber() > 
lastSequenceNum.getSubSequenceNumber();
   ```
   
   I've found early returns always helps with understanding these :)

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordBatch.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher;
+
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.amazonaws.services.kinesis.model.Record;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.List;
+
+/**
+ * A batch of UserRecords received from Kinesis.
+ * Input records are de-aggregated using KCL 1.x library.
+ * It is expected that AWS SDK v2.x messages are converted to KCL 1.x {@link 
UserRecord}.
+ */
+public class RecordBatch {
+
+       private final int aggregatedRecordSize;
+
+       private final List<UserRecord> deaggregatedRecords;
+
+       private final long totalSizeInBytes;
+
+       private final Long millisBehindLatest;
+
+       public RecordBatch(final List<Record> records, final StreamShardHandle 
subscribedShard, final Long millisBehindLatest) {
+               this.aggregatedRecordSize = records.size();

Review comment:
       Argument sanity / precondition checks.

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import 
org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import java.util.function.Consumer;
+
+/**
+ * An adaptive record publisher to add a dynamic loop delay and batch read 
size for {@link PollingRecordPublisher}.
+ * Kinesis Streams have quotas on the transactions per second, and throughout. 
This class attempts to balance
+ * quotas and mitigate back off errors.
+ */
+@Internal
+public class AdaptivePollingRecordPublisher extends PollingRecordPublisher {
+       // AWS Kinesis has a read limit of 2 Mb/sec
+       // 
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
+       private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 
1024L * 1024L;
+
+       private int lastRecordBatchSize = 0;
+
+       private long lastRecordBatchSizeInBytes = 0;
+
+       private long processingStartTimeNanos = System.nanoTime();
+
+       private int maxNumberOfRecordsPerFetch;
+
+       private final long fetchIntervalMillis;
+
+       private final PollingRecordPublisherMetricsReporter metricsReporter;
+
+       public AdaptivePollingRecordPublisher(

Review comment:
       constructor can be package-private

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisher.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+
+import java.util.function.Consumer;
+
+/**
+ * A {@code RecordPublisher} will consume records from an external stream and 
deliver them to the registered subscriber.
+ */
+@Internal
+public interface RecordPublisher {
+
+       /**
+        * Run the record publisher. Records will be consumed from the stream 
and published to the consumer.
+        * The number of batches retrieved by a single invocation will vary 
based on implementation.
+        *
+        * @param startingPosition the position in the stream from which to 
consume
+        * @param recordConsumer the record consumer in which to output records
+        * @return a status enum to represent whether a shard has been fully 
consumed
+        * @throws InterruptedException
+        */
+       RecordPublisherRunResult run(StartingPosition startingPosition, 
Consumer<RecordBatch> recordConsumer) throws InterruptedException;

Review comment:
       I found that the argument `StartingPosition` was slightly confusing to 
be passed in here, since from the implementations it seem like this 
`startingPosition` is only ever used on the first getRecords call, but with 
this interface we would be passing in the starting position on every invocation.
   
   It's also slightly more error-prone for the call site of `RecordPublisher`s, 
imagine if the call site continued to invoke `run` with a starting position, 
after the previous attempt already returned `COMPLETE` as a result.
   
   As a food for thought: having the `StartingPosition` passed in via the 
`RecordPublisherFactory` could potentially be a better option?

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StartingPosition.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
+import javax.annotation.Nullable;
+
+import java.util.Date;
+
+import static 
com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+import static 
com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
+import static 
com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static 
com.amazonaws.services.kinesis.model.ShardIteratorType.TRIM_HORIZON;
+import static 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.isSentinelSequenceNumber;
+
+/**
+ * The position in which to start consuming from a stream.
+ */
+public class StartingPosition {

Review comment:
       Mark with `@Internal`

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -53,283 +49,111 @@
  */
 @Internal
 public class ShardConsumer<T> implements Runnable {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(ShardConsumer.class);
-
-       // AWS Kinesis has a read limit of 2 Mb/sec
-       // 
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
-       private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 
1024L * 1024L;
-
        private final KinesisDeserializationSchema<T> deserializer;
 
-       private final KinesisProxyInterface kinesis;
-
        private final int subscribedShardStateIndex;
 
        private final KinesisDataFetcher<T> fetcherRef;
 
        private final StreamShardHandle subscribedShard;
 
-       private int maxNumberOfRecordsPerFetch;
-       private final long fetchIntervalMillis;
-       private final boolean useAdaptiveReads;
+       private final ShardConsumerMetricsReporter shardConsumerMetricsReporter;
 
-       private final ShardMetricsReporter shardMetricsReporter;
+       private StartingPosition startingPosition;
 
        private SequenceNumber lastSequenceNum;
 
-       private Date initTimestamp;
+       private final RecordPublisher recordPublisher;
 
        /**
         * Creates a shard consumer.
         *
         * @param fetcherRef reference to the owning fetcher
+        * @param recordPublisher the record publisher used to read records 
from kinesis
         * @param subscribedShardStateIndex the state index of the shard this 
consumer is subscribed to
         * @param subscribedShard the shard this consumer is subscribed to
         * @param lastSequenceNum the sequence number in the shard to start 
consuming
-        * @param kinesis the proxy instance to interact with Kinesis
-        * @param shardMetricsReporter the reporter to report metrics to
+        * @param shardConsumerMetricsReporter the reporter to report metrics to
+        * @param shardDeserializer used to deserialize incoming records
         */
        public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
+                                               RecordPublisher recordPublisher,
                                                Integer 
subscribedShardStateIndex,
                                                StreamShardHandle 
subscribedShard,
                                                SequenceNumber lastSequenceNum,
-                                               KinesisProxyInterface kinesis,
-                                               ShardMetricsReporter 
shardMetricsReporter,
+                                               ShardConsumerMetricsReporter 
shardConsumerMetricsReporter,
                                                KinesisDeserializationSchema<T> 
shardDeserializer) {
                this.fetcherRef = checkNotNull(fetcherRef);
+               this.recordPublisher = checkNotNull(recordPublisher);
                this.subscribedShardStateIndex = 
checkNotNull(subscribedShardStateIndex);
                this.subscribedShard = checkNotNull(subscribedShard);
+               this.shardConsumerMetricsReporter = 
checkNotNull(shardConsumerMetricsReporter);
                this.lastSequenceNum = checkNotNull(lastSequenceNum);
 
-               this.shardMetricsReporter = checkNotNull(shardMetricsReporter);
-
                checkArgument(
                        
!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
                        "Should not start a ShardConsumer if the shard has 
already been completely read.");
 
                this.deserializer = shardDeserializer;
 
                Properties consumerConfig = 
fetcherRef.getConsumerConfiguration();
-               this.kinesis = kinesis;
-               this.maxNumberOfRecordsPerFetch = 
Integer.valueOf(consumerConfig.getProperty(
-                       ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
-                       
Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
-               this.fetchIntervalMillis = 
Long.valueOf(consumerConfig.getProperty(
-                       
ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
-                       
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
-               this.useAdaptiveReads = 
Boolean.valueOf(consumerConfig.getProperty(
-                       ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS,
-                       
Boolean.toString(ConsumerConfigConstants.DEFAULT_SHARD_USE_ADAPTIVE_READS)));
-
-               if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
 {
+
+               if 
(lastSequenceNum.equals(SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
+                       Date initTimestamp;
                        String timestamp = 
consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
 
                        try {
                                String format = 
consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
                                        
ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
                                SimpleDateFormat customDateFormat = new 
SimpleDateFormat(format);
-                               this.initTimestamp = 
customDateFormat.parse(timestamp);
+                               initTimestamp = 
customDateFormat.parse(timestamp);
                        } catch (IllegalArgumentException | 
NullPointerException exception) {
                                throw new IllegalArgumentException(exception);
                        } catch (ParseException exception) {
-                               this.initTimestamp = new Date((long) 
(Double.parseDouble(timestamp) * 1000));
+                               initTimestamp = new Date((long) 
(Double.parseDouble(timestamp) * 1000));
                        }
-               } else {
-                       this.initTimestamp = null;
-               }
-       }
-
-       /**
-        * Returns a shard iterator for the given {@link SequenceNumber}.
-        *
-        * @return shard iterator
-        * @throws Exception
-        */
-       protected String getShardIterator(SequenceNumber sequenceNumber) throws 
Exception {
 
-               if (isSentinelSequenceNumber(sequenceNumber)) {
-                       return getShardIteratorForSentinel(sequenceNumber);
+                       startingPosition = 
StartingPosition.fromTimestamp(initTimestamp);
                } else {
-                       // we will be starting from an actual sequence number 
(due to restore from failure).
-                       return 
getShardIteratorForRealSequenceNumber(sequenceNumber);
+                       startingPosition = 
StartingPosition.restartFromSequenceNumber(checkNotNull(lastSequenceNum));
                }
        }
 
-       protected String getShardIteratorForSentinel(SequenceNumber 
sentinelSequenceNumber) throws InterruptedException {
-               String nextShardItr;
-
-               if 
(sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get()))
 {
-                       // if the shard is already closed, there will be no 
latest next record to get for this shard
-                       if (subscribedShard.isClosed()) {
-                               nextShardItr = null;
-                       } else {
-                               nextShardItr = 
kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), 
null);
-                       }
-               } else if 
(sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()))
 {
-                       nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.TRIM_HORIZON.toString(), null);
-               } else if 
(sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
-                       nextShardItr = null;
-               } else if 
(sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
 {
-                       nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
-               } else {
-                       throw new RuntimeException("Unknown sentinel type: " + 
sentinelSequenceNumber);
-               }
-
-               return nextShardItr;
-       }
-
-       protected String getShardIteratorForRealSequenceNumber(SequenceNumber 
sequenceNumber)
-                       throws Exception {
-
-               // if the last sequence number refers to an aggregated record, 
we need to clean up any dangling sub-records
-               // from the last aggregated record; otherwise, we can simply 
start iterating from the record right after.
-
-               if (sequenceNumber.isAggregated()) {
-                       return 
getShardIteratorForAggregatedSequenceNumber(sequenceNumber);
-               } else {
-                       // the last record was non-aggregated, so we can simply 
start from the next record
-                       return kinesis.getShardIterator(
-                                       subscribedShard,
-                                       
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(),
-                                       sequenceNumber.getSequenceNumber());
-               }
-       }
-
-       protected String 
getShardIteratorForAggregatedSequenceNumber(SequenceNumber sequenceNumber)
-                       throws Exception {
-
-               String itrForLastAggregatedRecord =
-                               kinesis.getShardIterator(
-                                               subscribedShard,
-                                               
ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
-                                               
sequenceNumber.getSequenceNumber());
-
-               // get only the last aggregated record
-               GetRecordsResult getRecordsResult = 
getRecords(itrForLastAggregatedRecord, 1);
-
-               List<UserRecord> fetchedRecords = deaggregateRecords(
-                               getRecordsResult.getRecords(),
-                               
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-                               
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
-               long lastSubSequenceNum = sequenceNumber.getSubSequenceNumber();
-               for (UserRecord record : fetchedRecords) {
-                       // we have found a dangling sub-record if it has a 
larger subsequence number
-                       // than our last sequence number; if so, collect the 
record and update state
-                       if (record.getSubSequenceNumber() > lastSubSequenceNum) 
{
-                               
deserializeRecordForCollectionAndUpdateState(record);
-                       }
-               }
-
-               return getRecordsResult.getNextShardIterator();
-       }
-
-       @SuppressWarnings("unchecked")
        @Override
        public void run() {
                try {
-                       String nextShardItr = getShardIterator(lastSequenceNum);
+                       while (isRunning()) {
+                               final RecordPublisherRunResult result = 
recordPublisher.run(startingPosition, batch -> {
+                                       for (UserRecord userRecord : 
batch.getDeaggregatedRecords()) {
+                                               if 
(filterDeaggregatedRecord(userRecord)) {
+                                                       
deserializeRecordForCollectionAndUpdateState(userRecord);
+                                               }
+                                       }
 
-                       long processingStartTimeNanos = System.nanoTime();
+                                       
shardConsumerMetricsReporter.setAverageRecordSizeBytes(batch.getAverageRecordSizeBytes());
+                                       
shardConsumerMetricsReporter.setNumberOfAggregatedRecords(batch.getAggregatedRecordSize());
+                                       
shardConsumerMetricsReporter.setNumberOfDeaggregatedRecords(batch.getDeaggregatedRecordSize());
+                                       
ofNullable(batch.getMillisBehindLatest()).ifPresent(shardConsumerMetricsReporter::setMillisBehindLatest);
+                               });
 
-                       while (isRunning()) {
-                               if (nextShardItr == null) {
+                               if (result == COMPLETE) {
                                        
fetcherRef.updateState(subscribedShardStateIndex, 
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
 
                                        // we can close this consumer thread 
once we've reached the end of the subscribed shard
                                        break;
                                } else {
-                                       
shardMetricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecordsPerFetch);
-                                       GetRecordsResult getRecordsResult = 
getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
-
-                                       List<Record> aggregatedRecords = 
getRecordsResult.getRecords();
-                                       int numberOfAggregatedRecords = 
aggregatedRecords.size();
-                                       
shardMetricsReporter.setNumberOfAggregatedRecords(numberOfAggregatedRecords);
-
-                                       // each of the Kinesis records may be 
aggregated, so we must deaggregate them before proceeding
-                                       List<UserRecord> fetchedRecords = 
deaggregateRecords(
-                                               aggregatedRecords,
-                                               
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-                                               
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
-                                       long recordBatchSizeBytes = 0L;
-                                       for (UserRecord record : 
fetchedRecords) {
-                                               recordBatchSizeBytes += 
record.getData().remaining();
-                                               
deserializeRecordForCollectionAndUpdateState(record);
-                                       }
-
-                                       int numberOfDeaggregatedRecords = 
fetchedRecords.size();
-                                       
shardMetricsReporter.setNumberOfDeaggregatedRecords(numberOfDeaggregatedRecords);
-
-                                       nextShardItr = 
getRecordsResult.getNextShardIterator();
-
-                                       long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
-                                       long runLoopTimeNanos = 
adjustmentEndTimeNanos - processingStartTimeNanos;
-                                       maxNumberOfRecordsPerFetch = 
adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), 
recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
-                                       
shardMetricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
-                                       processingStartTimeNanos = 
adjustmentEndTimeNanos; // for next time through the loop
+                                       startingPosition = 
StartingPosition.continueFromSequenceNumber(lastSequenceNum);

Review comment:
       Why does the `ShardConsumer` need to keep track and update the 
`startingPosition`?
   It feels like this should be an internal state to the record publisher, and 
also as how I understand it, the record publisher only ever respects the 
`nextShardItr` that it maintains.

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -53,283 +49,111 @@
  */
 @Internal
 public class ShardConsumer<T> implements Runnable {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(ShardConsumer.class);
-
-       // AWS Kinesis has a read limit of 2 Mb/sec
-       // 
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
-       private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 
1024L * 1024L;
-
        private final KinesisDeserializationSchema<T> deserializer;
 
-       private final KinesisProxyInterface kinesis;
-
        private final int subscribedShardStateIndex;
 
        private final KinesisDataFetcher<T> fetcherRef;
 
        private final StreamShardHandle subscribedShard;
 
-       private int maxNumberOfRecordsPerFetch;
-       private final long fetchIntervalMillis;
-       private final boolean useAdaptiveReads;
+       private final ShardConsumerMetricsReporter shardConsumerMetricsReporter;
 
-       private final ShardMetricsReporter shardMetricsReporter;
+       private StartingPosition startingPosition;
 
        private SequenceNumber lastSequenceNum;
 
-       private Date initTimestamp;
+       private final RecordPublisher recordPublisher;
 
        /**
         * Creates a shard consumer.
         *
         * @param fetcherRef reference to the owning fetcher
+        * @param recordPublisher the record publisher used to read records 
from kinesis
         * @param subscribedShardStateIndex the state index of the shard this 
consumer is subscribed to
         * @param subscribedShard the shard this consumer is subscribed to
         * @param lastSequenceNum the sequence number in the shard to start 
consuming
-        * @param kinesis the proxy instance to interact with Kinesis
-        * @param shardMetricsReporter the reporter to report metrics to
+        * @param shardConsumerMetricsReporter the reporter to report metrics to
+        * @param shardDeserializer used to deserialize incoming records
         */
        public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
+                                               RecordPublisher recordPublisher,
                                                Integer 
subscribedShardStateIndex,
                                                StreamShardHandle 
subscribedShard,
                                                SequenceNumber lastSequenceNum,
-                                               KinesisProxyInterface kinesis,
-                                               ShardMetricsReporter 
shardMetricsReporter,
+                                               ShardConsumerMetricsReporter 
shardConsumerMetricsReporter,
                                                KinesisDeserializationSchema<T> 
shardDeserializer) {
                this.fetcherRef = checkNotNull(fetcherRef);
+               this.recordPublisher = checkNotNull(recordPublisher);
                this.subscribedShardStateIndex = 
checkNotNull(subscribedShardStateIndex);
                this.subscribedShard = checkNotNull(subscribedShard);
+               this.shardConsumerMetricsReporter = 
checkNotNull(shardConsumerMetricsReporter);
                this.lastSequenceNum = checkNotNull(lastSequenceNum);
 
-               this.shardMetricsReporter = checkNotNull(shardMetricsReporter);
-
                checkArgument(
                        
!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
                        "Should not start a ShardConsumer if the shard has 
already been completely read.");
 
                this.deserializer = shardDeserializer;
 
                Properties consumerConfig = 
fetcherRef.getConsumerConfiguration();
-               this.kinesis = kinesis;
-               this.maxNumberOfRecordsPerFetch = 
Integer.valueOf(consumerConfig.getProperty(
-                       ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
-                       
Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
-               this.fetchIntervalMillis = 
Long.valueOf(consumerConfig.getProperty(
-                       
ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
-                       
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
-               this.useAdaptiveReads = 
Boolean.valueOf(consumerConfig.getProperty(
-                       ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS,
-                       
Boolean.toString(ConsumerConfigConstants.DEFAULT_SHARD_USE_ADAPTIVE_READS)));
-
-               if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
 {
+
+               if 
(lastSequenceNum.equals(SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
+                       Date initTimestamp;
                        String timestamp = 
consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
 
                        try {
                                String format = 
consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
                                        
ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
                                SimpleDateFormat customDateFormat = new 
SimpleDateFormat(format);
-                               this.initTimestamp = 
customDateFormat.parse(timestamp);
+                               initTimestamp = 
customDateFormat.parse(timestamp);
                        } catch (IllegalArgumentException | 
NullPointerException exception) {
                                throw new IllegalArgumentException(exception);
                        } catch (ParseException exception) {
-                               this.initTimestamp = new Date((long) 
(Double.parseDouble(timestamp) * 1000));
+                               initTimestamp = new Date((long) 
(Double.parseDouble(timestamp) * 1000));
                        }
-               } else {
-                       this.initTimestamp = null;
-               }
-       }
-
-       /**
-        * Returns a shard iterator for the given {@link SequenceNumber}.
-        *
-        * @return shard iterator
-        * @throws Exception
-        */
-       protected String getShardIterator(SequenceNumber sequenceNumber) throws 
Exception {
 
-               if (isSentinelSequenceNumber(sequenceNumber)) {
-                       return getShardIteratorForSentinel(sequenceNumber);
+                       startingPosition = 
StartingPosition.fromTimestamp(initTimestamp);
                } else {
-                       // we will be starting from an actual sequence number 
(due to restore from failure).
-                       return 
getShardIteratorForRealSequenceNumber(sequenceNumber);
+                       startingPosition = 
StartingPosition.restartFromSequenceNumber(checkNotNull(lastSequenceNum));
                }
        }
 
-       protected String getShardIteratorForSentinel(SequenceNumber 
sentinelSequenceNumber) throws InterruptedException {

Review comment:
       👍 really nice to see this go away :)

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import 
org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import java.util.function.Consumer;
+
+/**
+ * An adaptive record publisher to add a dynamic loop delay and batch read 
size for {@link PollingRecordPublisher}.
+ * Kinesis Streams have quotas on the transactions per second, and throughout. 
This class attempts to balance
+ * quotas and mitigate back off errors.
+ */
+@Internal
+public class AdaptivePollingRecordPublisher extends PollingRecordPublisher {

Review comment:
       👍 really like the approach you took in refactoring this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to