gguptp commented on code in PR #190:
URL:
https://github.com/apache/flink-connector-aws/pull/190#discussion_r1979565026
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java:
##########
@@ -57,48 +60,89 @@ public class PollingDynamoDbStreamsShardSplitReader
new DynamoDbStreamRecordsWithSplitIds(Collections.emptyIterator(),
null, false);
private final StreamProxy dynamodbStreams;
+ private final Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls;
+ private final Duration getRecordsIdlePollingTimeBetweenEmptyPolls;
- private final Deque<DynamoDbStreamsShardSplitState> assignedSplits = new
ArrayDeque<>();
+ private final Deque<DynamoDbStreamsShardSplitWithContext> assignedSplits;
private final Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap;
- private final Set<String> pausedSplitIds = new HashSet<>();
+ private final Set<String> pausedSplitIds;
+ private static final Logger LOG =
+
LoggerFactory.getLogger(PollingDynamoDbStreamsShardSplitReader.class);
public PollingDynamoDbStreamsShardSplitReader(
StreamProxy dynamodbStreamsProxy,
+ Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls,
+ Duration getRecordsIdlePollingTimeBetweenEmptyPolls,
Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap) {
this.dynamodbStreams = dynamodbStreamsProxy;
+ this.getRecordsIdlePollingTimeBetweenNonEmptyPolls =
+ getRecordsIdlePollingTimeBetweenNonEmptyPolls;
+ this.getRecordsIdlePollingTimeBetweenEmptyPolls =
+ getRecordsIdlePollingTimeBetweenEmptyPolls;
this.shardMetricGroupMap = shardMetricGroupMap;
+ this.assignedSplits = new ArrayDeque<>();
+ this.pausedSplitIds = new HashSet<>();
+ }
+
+ private long getNextEligibleTime(DynamoDbStreamsShardSplitWithContext
splitContext) {
+ long requiredDelay =
+ splitContext.wasLastPollEmpty
+ ? getRecordsIdlePollingTimeBetweenEmptyPolls.toMillis()
+ :
getRecordsIdlePollingTimeBetweenNonEmptyPolls.toMillis();
+
+ return splitContext.lastPollTimeMillis + requiredDelay;
}
@Override
public RecordsWithSplitIds<Record> fetch() throws IOException {
- DynamoDbStreamsShardSplitState splitState = assignedSplits.poll();
- if (splitState == null) {
+ if (assignedSplits.isEmpty()) {
+ return INCOMPLETE_SHARD_EMPTY_RECORDS;
+ }
+ DynamoDbStreamsShardSplitWithContext splitContext =
assignedSplits.poll();
+
+ if (pausedSplitIds.contains(splitContext.splitState.getSplitId())) {
+ assignedSplits.add(splitContext);
return INCOMPLETE_SHARD_EMPTY_RECORDS;
}
- if (pausedSplitIds.contains(splitState.getSplitId())) {
- assignedSplits.add(splitState);
+ // Check if split is paused or not ready due to empty poll delay
Review Comment:
yes, will update this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]