Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/2432#discussion_r76617364
--- Diff:
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
---
@@ -219,19 +228,52 @@ private void
deserializeRecordForCollectionAndUpdateState(UserRecord record)
subscribedShard.getStreamName(),
subscribedShard.getShard().getShardId());
- if (record.isAggregated()) {
- fetcherRef.emitRecordAndUpdateState(
- value,
- approxArrivalTimestamp,
- subscribedShardStateIndex,
- new SequenceNumber(record.getSequenceNumber(),
record.getSubSequenceNumber()));
- } else {
- fetcherRef.emitRecordAndUpdateState(
- value,
- approxArrivalTimestamp,
- subscribedShardStateIndex,
- new SequenceNumber(record.getSequenceNumber()));
+ SequenceNumber collectedSequenceNumber = (record.isAggregated())
+ ? new SequenceNumber(record.getSequenceNumber(),
record.getSubSequenceNumber())
+ : new SequenceNumber(record.getSequenceNumber());
+
+ fetcherRef.emitRecordAndUpdateState(
+ value,
+ approxArrivalTimestamp,
+ subscribedShardStateIndex,
+ collectedSequenceNumber);
+
+ lastSequenceNum = collectedSequenceNumber;
+ }
+
+ /**
+ * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while
also handling unexpected
+ * AWS {@link ExpiredIteratorException}s to assure that we get results
and don't just fail on
+ * such occasions. The returned shard iterator within the successful
{@link GetRecordsResult} should
+ * be used for the next call to this method.
+ *
+ * Note: it is important that this method is not called again before
all the records from the last result have been
+ * fully collected with {@link
ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)},
otherwise
+ * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in
the middle of an aggregated record, leading to
+ * incorrect shard iteration if the iterator had to be refreshed.
+ *
+ * @param shardItr shard iterator to use
+ * @param maxNumberOfRecords the maximum number of records to fetch for
this getRecords attempt
+ * @return get records result
+ * @throws InterruptedException
+ */
+ private GetRecordsResult getRecords(String shardItr, int
maxNumberOfRecords) throws InterruptedException {
+ GetRecordsResult getRecordsResult = null;
+ while (getRecordsResult == null) {
+ try {
+ getRecordsResult = kinesis.getRecords(shardItr,
maxNumberOfRecords);
+ } catch (ExpiredIteratorException eiEx) {
+ LOG.warn("Encountered an unexpected expired
iterator {} for shard {};" +
+ " refreshing the iterator ...",
shardItr, subscribedShard);
+ shardItr =
kinesis.getShardIterator(subscribedShard,
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(),
lastSequenceNum.getSequenceNumber());
+
+ // sleep for the fetch interval before the next
getRecords attempt with the refreshed iterator
+ if (fetchIntervalMillis != 0) {
+ Thread.sleep(fetchIntervalMillis);
+ }
--- End diff --
This fetchInterval implementation can lead to much larger fetch intervals.
If the getRecords call needs `n` milliseconds, the waiting time between
each `getRecords` call is is `n + fetchInterval`.
We don't need to fix this in this PR, but I think in general, we should fix
it (if you agree). Also, we need to see how we make this efficient
(System.currentTimeMilis() is a somewhat expensive call).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---