leekeiabstraction commented on code in PR #195:
URL:
https://github.com/apache/flink-connector-aws/pull/195#discussion_r2031062113
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java:
##########
@@ -48,21 +64,65 @@ public PollingKinesisShardSplitReader(
this.kinesis = kinesisProxy;
this.configuration = configuration;
this.maxRecordsToGet =
configuration.get(KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX);
+ this.getRecordsIntervalMillis =
configuration.get(SHARD_GET_RECORDS_INTERVAL).toMillis();
+ this.idleSourceGetRecordsIntervalMillis =
+
configuration.get(SHARD_GET_RECORDS_IDLE_SOURCE_INTERVAL).toMillis();
}
@Override
- protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
+ protected RecordBatch fetchRecords(KinesisShardSplitState splitState)
throws IOException {
+ if (skipUntilScheduledGetRecordTime(splitState)) {
+ return null;
+ }
+
GetRecordsResponse getRecordsResponse =
kinesis.getRecords(
splitState.getStreamArn(),
splitState.getShardId(),
splitState.getNextStartingPosition(),
this.maxRecordsToGet);
+
+ scheduleNextGetRecord(splitState, getRecordsResponse);
+
boolean isCompleted = getRecordsResponse.nextShardIterator() == null;
return new RecordBatch(
getRecordsResponse.records(),
getRecordsResponse.millisBehindLatest(), isCompleted);
}
+ private boolean skipUntilScheduledGetRecordTime(KinesisShardSplitState
splitState)
+ throws IOException {
+ if (scheduledGetRecordTimes.containsKey(splitState)
+ && scheduledGetRecordTimes.get(splitState) >
System.currentTimeMillis()) {
+ try {
+ Thread.sleep(1);
Review Comment:
To elaborate on Abhi's point further. This is to prevent
KinesisShardSplitReaderBase from immediately looping through fetch() for each
split in the case where we have idle sources, causing very high CPU usage.
Adding a 1ms pause ensures lower CPU usage at the cost of at most 1ms wait
time per split. There is an edge case where a source with large number of idle
assigned split will not loop through all its assigned split in a timely manner
adding latency to GetRecords from a minority of non-idle shards, but this would
require upwards of 1000 assigned splits to incur a 1 second delay. Assigning
upwards of 1000 active splits to a source subtask is an unlikely and
impractical scenario (app should really have much higher parallelism).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]