dannycranmer commented on a change in pull request #12881:
URL: https://github.com/apache/flink/pull/12881#discussion_r454853969



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -379,49 +205,18 @@ private void 
deserializeRecordForCollectionAndUpdateState(UserRecord record)
        }
 
        /**
-        * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while 
also handling unexpected
-        * AWS {@link ExpiredIteratorException}s to assure that we get results 
and don't just fail on
-        * such occasions. The returned shard iterator within the successful 
{@link GetRecordsResult} should
-        * be used for the next call to this method.
-        *
-        * <p>Note: it is important that this method is not called again before 
all the records from the last result have been
-        * fully collected with {@link 
ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, 
otherwise
-        * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in 
the middle of an aggregated record, leading to
-        * incorrect shard iteration if the iterator had to be refreshed.
+        * Filters out aggregated records that have previously been processed.
+        * This method is to support restarting from a partially consumed 
aggregated sequence number.
         *
-        * @param shardItr shard iterator to use
-        * @param maxNumberOfRecords the maximum number of records to fetch for 
this getRecords attempt
-        * @return get records result
-        * @throws InterruptedException
+        * @param record the record to filter
+        * @return {@code true} if the record should be retained
         */
-       private GetRecordsResult getRecords(String shardItr, int 
maxNumberOfRecords) throws Exception {
-               GetRecordsResult getRecordsResult = null;
-               while (getRecordsResult == null) {
-                       try {
-                               getRecordsResult = kinesis.getRecords(shardItr, 
maxNumberOfRecords);
-
-                               // Update millis behind latest so it gets 
reported by the millisBehindLatest gauge
-                               Long millisBehindLatest = 
getRecordsResult.getMillisBehindLatest();
-                               if (millisBehindLatest != null) {
-                                       
shardMetricsReporter.setMillisBehindLatest(millisBehindLatest);
-                               }
-                       } catch (ExpiredIteratorException eiEx) {
-                               LOG.warn("Encountered an unexpected expired 
iterator {} for shard {};" +
-                                       " refreshing the iterator ...", 
shardItr, subscribedShard);
-
-                               shardItr = getShardIterator(lastSequenceNum);
-
-                               // sleep for the fetch interval before the next 
getRecords attempt with the refreshed iterator
-                               if (fetchIntervalMillis != 0) {
-                                       Thread.sleep(fetchIntervalMillis);
-                               }
-                       }
+       private boolean filterDeaggregatedRecord(final UserRecord record) {
+               if (lastSequenceNum.isAggregated()) {

Review comment:
       It is replicating this behaviour: 
https://github.com/apache/flink/blob/release-1.10.1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L198
   
   Say you have an aggregated message X with 6 child records and you stop 
consuming at subsequence 3, the last consumed sequence number would be:
   - sequence: X
   - sub-sequence: 3
   
   Therefore when you restart at X, you need to discard subsequence 1,2,3 and 
process 4,5,6.
   
   This method discards any previously processed subsequence numbers 
continuously, rather than upfront. This allows the logic to be pulled into the 
`ShardConsumer` from the `RecordPublisher` and support EFO/Polling without 
configuring multiple clients. As explained in the PR description, the code is 
simpler putting this check here at the expense of a slight overhead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to