dannycranmer commented on a change in pull request #12881:
URL: https://github.com/apache/flink/pull/12881#discussion_r469125201



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -53,283 +49,111 @@
  */
 @Internal
 public class ShardConsumer<T> implements Runnable {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(ShardConsumer.class);
-
-       // AWS Kinesis has a read limit of 2 Mb/sec
-       // 
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
-       private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 
1024L * 1024L;
-
        private final KinesisDeserializationSchema<T> deserializer;
 
-       private final KinesisProxyInterface kinesis;
-
        private final int subscribedShardStateIndex;
 
        private final KinesisDataFetcher<T> fetcherRef;
 
        private final StreamShardHandle subscribedShard;
 
-       private int maxNumberOfRecordsPerFetch;
-       private final long fetchIntervalMillis;
-       private final boolean useAdaptiveReads;
+       private final ShardConsumerMetricsReporter shardConsumerMetricsReporter;
 
-       private final ShardMetricsReporter shardMetricsReporter;
+       private StartingPosition startingPosition;
 
        private SequenceNumber lastSequenceNum;
 
-       private Date initTimestamp;
+       private final RecordPublisher recordPublisher;
 
        /**
         * Creates a shard consumer.
         *
         * @param fetcherRef reference to the owning fetcher
+        * @param recordPublisher the record publisher used to read records 
from kinesis
         * @param subscribedShardStateIndex the state index of the shard this 
consumer is subscribed to
         * @param subscribedShard the shard this consumer is subscribed to
         * @param lastSequenceNum the sequence number in the shard to start 
consuming
-        * @param kinesis the proxy instance to interact with Kinesis
-        * @param shardMetricsReporter the reporter to report metrics to
+        * @param shardConsumerMetricsReporter the reporter to report metrics to
+        * @param shardDeserializer used to deserialize incoming records
         */
        public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
+                                               RecordPublisher recordPublisher,
                                                Integer 
subscribedShardStateIndex,
                                                StreamShardHandle 
subscribedShard,
                                                SequenceNumber lastSequenceNum,
-                                               KinesisProxyInterface kinesis,
-                                               ShardMetricsReporter 
shardMetricsReporter,
+                                               ShardConsumerMetricsReporter 
shardConsumerMetricsReporter,
                                                KinesisDeserializationSchema<T> 
shardDeserializer) {
                this.fetcherRef = checkNotNull(fetcherRef);
+               this.recordPublisher = checkNotNull(recordPublisher);
                this.subscribedShardStateIndex = 
checkNotNull(subscribedShardStateIndex);
                this.subscribedShard = checkNotNull(subscribedShard);
+               this.shardConsumerMetricsReporter = 
checkNotNull(shardConsumerMetricsReporter);
                this.lastSequenceNum = checkNotNull(lastSequenceNum);
 
-               this.shardMetricsReporter = checkNotNull(shardMetricsReporter);
-
                checkArgument(
                        
!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
                        "Should not start a ShardConsumer if the shard has 
already been completely read.");
 
                this.deserializer = shardDeserializer;
 
                Properties consumerConfig = 
fetcherRef.getConsumerConfiguration();
-               this.kinesis = kinesis;
-               this.maxNumberOfRecordsPerFetch = 
Integer.valueOf(consumerConfig.getProperty(
-                       ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
-                       
Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
-               this.fetchIntervalMillis = 
Long.valueOf(consumerConfig.getProperty(
-                       
ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
-                       
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
-               this.useAdaptiveReads = 
Boolean.valueOf(consumerConfig.getProperty(
-                       ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS,
-                       
Boolean.toString(ConsumerConfigConstants.DEFAULT_SHARD_USE_ADAPTIVE_READS)));
-
-               if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
 {
+
+               if 
(lastSequenceNum.equals(SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
+                       Date initTimestamp;
                        String timestamp = 
consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
 
                        try {
                                String format = 
consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
                                        
ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
                                SimpleDateFormat customDateFormat = new 
SimpleDateFormat(format);
-                               this.initTimestamp = 
customDateFormat.parse(timestamp);
+                               initTimestamp = 
customDateFormat.parse(timestamp);
                        } catch (IllegalArgumentException | 
NullPointerException exception) {
                                throw new IllegalArgumentException(exception);
                        } catch (ParseException exception) {
-                               this.initTimestamp = new Date((long) 
(Double.parseDouble(timestamp) * 1000));
+                               initTimestamp = new Date((long) 
(Double.parseDouble(timestamp) * 1000));
                        }
-               } else {
-                       this.initTimestamp = null;
-               }
-       }
-
-       /**
-        * Returns a shard iterator for the given {@link SequenceNumber}.
-        *
-        * @return shard iterator
-        * @throws Exception
-        */
-       protected String getShardIterator(SequenceNumber sequenceNumber) throws 
Exception {
 
-               if (isSentinelSequenceNumber(sequenceNumber)) {
-                       return getShardIteratorForSentinel(sequenceNumber);
+                       startingPosition = 
StartingPosition.fromTimestamp(initTimestamp);
                } else {
-                       // we will be starting from an actual sequence number 
(due to restore from failure).
-                       return 
getShardIteratorForRealSequenceNumber(sequenceNumber);
+                       startingPosition = 
StartingPosition.restartFromSequenceNumber(checkNotNull(lastSequenceNum));
                }
        }
 
-       protected String getShardIteratorForSentinel(SequenceNumber 
sentinelSequenceNumber) throws InterruptedException {
-               String nextShardItr;
-
-               if 
(sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get()))
 {
-                       // if the shard is already closed, there will be no 
latest next record to get for this shard
-                       if (subscribedShard.isClosed()) {
-                               nextShardItr = null;
-                       } else {
-                               nextShardItr = 
kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), 
null);
-                       }
-               } else if 
(sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()))
 {
-                       nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.TRIM_HORIZON.toString(), null);
-               } else if 
(sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
-                       nextShardItr = null;
-               } else if 
(sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
 {
-                       nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
-               } else {
-                       throw new RuntimeException("Unknown sentinel type: " + 
sentinelSequenceNumber);
-               }
-
-               return nextShardItr;
-       }
-
-       protected String getShardIteratorForRealSequenceNumber(SequenceNumber 
sequenceNumber)
-                       throws Exception {
-
-               // if the last sequence number refers to an aggregated record, 
we need to clean up any dangling sub-records
-               // from the last aggregated record; otherwise, we can simply 
start iterating from the record right after.
-
-               if (sequenceNumber.isAggregated()) {
-                       return 
getShardIteratorForAggregatedSequenceNumber(sequenceNumber);
-               } else {
-                       // the last record was non-aggregated, so we can simply 
start from the next record
-                       return kinesis.getShardIterator(
-                                       subscribedShard,
-                                       
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(),
-                                       sequenceNumber.getSequenceNumber());
-               }
-       }
-
-       protected String 
getShardIteratorForAggregatedSequenceNumber(SequenceNumber sequenceNumber)
-                       throws Exception {
-
-               String itrForLastAggregatedRecord =
-                               kinesis.getShardIterator(
-                                               subscribedShard,
-                                               
ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
-                                               
sequenceNumber.getSequenceNumber());
-
-               // get only the last aggregated record
-               GetRecordsResult getRecordsResult = 
getRecords(itrForLastAggregatedRecord, 1);
-
-               List<UserRecord> fetchedRecords = deaggregateRecords(
-                               getRecordsResult.getRecords(),
-                               
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-                               
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
-               long lastSubSequenceNum = sequenceNumber.getSubSequenceNumber();
-               for (UserRecord record : fetchedRecords) {
-                       // we have found a dangling sub-record if it has a 
larger subsequence number
-                       // than our last sequence number; if so, collect the 
record and update state
-                       if (record.getSubSequenceNumber() > lastSubSequenceNum) 
{
-                               
deserializeRecordForCollectionAndUpdateState(record);
-                       }
-               }
-
-               return getRecordsResult.getNextShardIterator();
-       }
-
-       @SuppressWarnings("unchecked")
        @Override
        public void run() {
                try {
-                       String nextShardItr = getShardIterator(lastSequenceNum);
+                       while (isRunning()) {
+                               final RecordPublisherRunResult result = 
recordPublisher.run(startingPosition, batch -> {
+                                       for (UserRecord userRecord : 
batch.getDeaggregatedRecords()) {
+                                               if 
(filterDeaggregatedRecord(userRecord)) {
+                                                       
deserializeRecordForCollectionAndUpdateState(userRecord);
+                                               }
+                                       }
 
-                       long processingStartTimeNanos = System.nanoTime();
+                                       
shardConsumerMetricsReporter.setAverageRecordSizeBytes(batch.getAverageRecordSizeBytes());
+                                       
shardConsumerMetricsReporter.setNumberOfAggregatedRecords(batch.getAggregatedRecordSize());
+                                       
shardConsumerMetricsReporter.setNumberOfDeaggregatedRecords(batch.getDeaggregatedRecordSize());
+                                       
ofNullable(batch.getMillisBehindLatest()).ifPresent(shardConsumerMetricsReporter::setMillisBehindLatest);
+                               });
 
-                       while (isRunning()) {
-                               if (nextShardItr == null) {
+                               if (result == COMPLETE) {
                                        
fetcherRef.updateState(subscribedShardStateIndex, 
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
 
                                        // we can close this consumer thread 
once we've reached the end of the subscribed shard
                                        break;
                                } else {
-                                       
shardMetricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecordsPerFetch);
-                                       GetRecordsResult getRecordsResult = 
getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
-
-                                       List<Record> aggregatedRecords = 
getRecordsResult.getRecords();
-                                       int numberOfAggregatedRecords = 
aggregatedRecords.size();
-                                       
shardMetricsReporter.setNumberOfAggregatedRecords(numberOfAggregatedRecords);
-
-                                       // each of the Kinesis records may be 
aggregated, so we must deaggregate them before proceeding
-                                       List<UserRecord> fetchedRecords = 
deaggregateRecords(
-                                               aggregatedRecords,
-                                               
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-                                               
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
-                                       long recordBatchSizeBytes = 0L;
-                                       for (UserRecord record : 
fetchedRecords) {
-                                               recordBatchSizeBytes += 
record.getData().remaining();
-                                               
deserializeRecordForCollectionAndUpdateState(record);
-                                       }
-
-                                       int numberOfDeaggregatedRecords = 
fetchedRecords.size();
-                                       
shardMetricsReporter.setNumberOfDeaggregatedRecords(numberOfDeaggregatedRecords);
-
-                                       nextShardItr = 
getRecordsResult.getNextShardIterator();
-
-                                       long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
-                                       long runLoopTimeNanos = 
adjustmentEndTimeNanos - processingStartTimeNanos;
-                                       maxNumberOfRecordsPerFetch = 
adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), 
recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
-                                       
shardMetricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
-                                       processingStartTimeNanos = 
adjustmentEndTimeNanos; // for next time through the loop
+                                       startingPosition = 
StartingPosition.continueFromSequenceNumber(lastSequenceNum);

Review comment:
       It does not anymore.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to