dannycranmer commented on a change in pull request #12881:
URL: https://github.com/apache/flink/pull/12881#discussion_r454853969
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -379,49 +205,18 @@ private void
deserializeRecordForCollectionAndUpdateState(UserRecord record)
}
/**
- * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while
also handling unexpected
- * AWS {@link ExpiredIteratorException}s to assure that we get results
and don't just fail on
- * such occasions. The returned shard iterator within the successful
{@link GetRecordsResult} should
- * be used for the next call to this method.
- *
- * <p>Note: it is important that this method is not called again before
all the records from the last result have been
- * fully collected with {@link
ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)},
otherwise
- * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in
the middle of an aggregated record, leading to
- * incorrect shard iteration if the iterator had to be refreshed.
+ * Filters out aggregated records that have previously been processed.
+ * This method is to support restarting from a partially consumed
aggregated sequence number.
*
- * @param shardItr shard iterator to use
- * @param maxNumberOfRecords the maximum number of records to fetch for
this getRecords attempt
- * @return get records result
- * @throws InterruptedException
+ * @param record the record to filter
+ * @return {@code true} if the record should be retained
*/
- private GetRecordsResult getRecords(String shardItr, int
maxNumberOfRecords) throws Exception {
- GetRecordsResult getRecordsResult = null;
- while (getRecordsResult == null) {
- try {
- getRecordsResult = kinesis.getRecords(shardItr,
maxNumberOfRecords);
-
- // Update millis behind latest so it gets
reported by the millisBehindLatest gauge
- Long millisBehindLatest =
getRecordsResult.getMillisBehindLatest();
- if (millisBehindLatest != null) {
-
shardMetricsReporter.setMillisBehindLatest(millisBehindLatest);
- }
- } catch (ExpiredIteratorException eiEx) {
- LOG.warn("Encountered an unexpected expired
iterator {} for shard {};" +
- " refreshing the iterator ...",
shardItr, subscribedShard);
-
- shardItr = getShardIterator(lastSequenceNum);
-
- // sleep for the fetch interval before the next
getRecords attempt with the refreshed iterator
- if (fetchIntervalMillis != 0) {
- Thread.sleep(fetchIntervalMillis);
- }
- }
+ private boolean filterDeaggregatedRecord(final UserRecord record) {
+ if (lastSequenceNum.isAggregated()) {
Review comment:
It is replicating this behaviour:
https://github.com/apache/flink/blob/release-1.10.1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L198
Say you have an aggregated message X with 6 child records and you stop
consuming at subsequence 3, the last consumed sequence number would be:
- sequence: X
- sub-sequence: 3
Therefore when you restart at X, you need to discard subsequence 1,2,3 and
process 4,5,6.
This method discards any previously processed subsequence numbers
continuously, rather than upfront. This allows the logic to be pulled into the
`ShardConsumer` from the `RecordPublisher` and support EFO/Polling without
configuring multiple clients. As explained in the PR description, the code is
simpler putting this check here at the expense of a slight overhead.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]