[
https://issues.apache.org/jira/browse/FLINK-4514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15446048#comment-15446048
]
ASF GitHub Bot commented on FLINK-4514:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/2432#discussion_r76617364
--- Diff:
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
---
@@ -219,19 +228,52 @@ private void
deserializeRecordForCollectionAndUpdateState(UserRecord record)
subscribedShard.getStreamName(),
subscribedShard.getShard().getShardId());
- if (record.isAggregated()) {
- fetcherRef.emitRecordAndUpdateState(
- value,
- approxArrivalTimestamp,
- subscribedShardStateIndex,
- new SequenceNumber(record.getSequenceNumber(),
record.getSubSequenceNumber()));
- } else {
- fetcherRef.emitRecordAndUpdateState(
- value,
- approxArrivalTimestamp,
- subscribedShardStateIndex,
- new SequenceNumber(record.getSequenceNumber()));
+ SequenceNumber collectedSequenceNumber = (record.isAggregated())
+ ? new SequenceNumber(record.getSequenceNumber(),
record.getSubSequenceNumber())
+ : new SequenceNumber(record.getSequenceNumber());
+
+ fetcherRef.emitRecordAndUpdateState(
+ value,
+ approxArrivalTimestamp,
+ subscribedShardStateIndex,
+ collectedSequenceNumber);
+
+ lastSequenceNum = collectedSequenceNumber;
+ }
+
+ /**
+ * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while
also handling unexpected
+ * AWS {@link ExpiredIteratorException}s to assure that we get results
and don't just fail on
+ * such occasions. The returned shard iterator within the successful
{@link GetRecordsResult} should
+ * be used for the next call to this method.
+ *
+ * Note: it is important that this method is not called again before
all the records from the last result have been
+ * fully collected with {@link
ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)},
otherwise
+ * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in
the middle of an aggregated record, leading to
+ * incorrect shard iteration if the iterator had to be refreshed.
+ *
+ * @param shardItr shard iterator to use
+ * @param maxNumberOfRecords the maximum number of records to fetch for
this getRecords attempt
+ * @return get records result
+ * @throws InterruptedException
+ */
+ private GetRecordsResult getRecords(String shardItr, int
maxNumberOfRecords) throws InterruptedException {
+ GetRecordsResult getRecordsResult = null;
+ while (getRecordsResult == null) {
+ try {
+ getRecordsResult = kinesis.getRecords(shardItr,
maxNumberOfRecords);
+ } catch (ExpiredIteratorException eiEx) {
+ LOG.warn("Encountered an unexpected expired
iterator {} for shard {};" +
+ " refreshing the iterator ...",
shardItr, subscribedShard);
+ shardItr =
kinesis.getShardIterator(subscribedShard,
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(),
lastSequenceNum.getSequenceNumber());
+
+ // sleep for the fetch interval before the next
getRecords attempt with the refreshed iterator
+ if (fetchIntervalMillis != 0) {
+ Thread.sleep(fetchIntervalMillis);
+ }
--- End diff --
This fetchInterval implementation can lead to much larger fetch intervals.
If the getRecords call needs `n` milliseconds, the waiting time between
each `getRecords` call is is `n + fetchInterval`.
We don't need to fix this in this PR, but I think in general, we should fix
it (if you agree). Also, we need to see how we make this efficient
(System.currentTimeMilis() is a somewhat expensive call).
> ExpiredIteratorException in Kinesis Consumer on long catch-ups to head of
> stream
> --------------------------------------------------------------------------------
>
> Key: FLINK-4514
> URL: https://issues.apache.org/jira/browse/FLINK-4514
> Project: Flink
> Issue Type: Bug
> Components: Kinesis Connector
> Affects Versions: 1.1.0, 1.1.1
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0, 1.1.2
>
>
> Original mailing thread for the reported issue:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-connector-Iterator-expired-exception-td8711.html
> Normally, the exception is thrown when the consumer uses the same shard
> iterator after 5 minutes since it was retrieved. I've still yet to clarify &
> reproduce the root cause of the {{ExpiredIteratorException}}, because from
> the code this seems to be impossible. I'm leaning towards suspecting this is
> a Kinesis-side issue (from the description in the ML, the behaviour also
> seems indeterminate).
> Either way, the exception can be fairly easily handled so that the consumer
> doesn't just fail. When caught, we request a new shard iterator from Kinesis
> with the last sequence number.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)