hlteoh37 commented on code in PR #190:
URL:
https://github.com/apache/flink-connector-aws/pull/190#discussion_r1979552202
##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java:
##########
@@ -320,10 +336,103 @@ record ->
for (int i = 0; i < 10; i++) {
RecordsWithSplitIds<Record> records = splitReader.fetch();
fetchedRecords.addAll(readAllRecords(records));
+ Thread.sleep(NON_EMPTY_POLLING_DELAY_MILLIS.toMillis());
}
assertThat(fetchedRecords).containsExactly(recordsFromSplit3.toArray(new
Record[0]));
}
+ @Test
+ void testPollingDelayForEmptyRecords() throws Exception {
+ // Given assigned split with no records
+ testStreamProxy.addShards(TEST_SHARD_ID);
+ splitReader.handleSplitsChanges(
+ new
SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
+
+ // First poll - should return empty records
+ RecordsWithSplitIds<Record> firstPoll = splitReader.fetch();
+ assertThat(firstPoll.nextRecordFromSplit()).isNull();
+ assertThat(firstPoll.nextSplit()).isNull();
+ assertThat(firstPoll.finishedSplits()).isEmpty();
+
+ // Immediate second poll - should return empty due to polling delay
Review Comment:
Can we make sure this test is not flaky by making split delay something like
1 min?
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java:
##########
@@ -57,48 +60,89 @@ public class PollingDynamoDbStreamsShardSplitReader
new DynamoDbStreamRecordsWithSplitIds(Collections.emptyIterator(),
null, false);
private final StreamProxy dynamodbStreams;
+ private final Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls;
+ private final Duration getRecordsIdlePollingTimeBetweenEmptyPolls;
- private final Deque<DynamoDbStreamsShardSplitState> assignedSplits = new
ArrayDeque<>();
+ private final Deque<DynamoDbStreamsShardSplitWithContext> assignedSplits;
private final Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap;
- private final Set<String> pausedSplitIds = new HashSet<>();
+ private final Set<String> pausedSplitIds;
+ private static final Logger LOG =
+
LoggerFactory.getLogger(PollingDynamoDbStreamsShardSplitReader.class);
public PollingDynamoDbStreamsShardSplitReader(
StreamProxy dynamodbStreamsProxy,
+ Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls,
+ Duration getRecordsIdlePollingTimeBetweenEmptyPolls,
Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap) {
this.dynamodbStreams = dynamodbStreamsProxy;
+ this.getRecordsIdlePollingTimeBetweenNonEmptyPolls =
+ getRecordsIdlePollingTimeBetweenNonEmptyPolls;
+ this.getRecordsIdlePollingTimeBetweenEmptyPolls =
+ getRecordsIdlePollingTimeBetweenEmptyPolls;
this.shardMetricGroupMap = shardMetricGroupMap;
+ this.assignedSplits = new ArrayDeque<>();
+ this.pausedSplitIds = new HashSet<>();
+ }
+
+ private long getNextEligibleTime(DynamoDbStreamsShardSplitWithContext
splitContext) {
+ long requiredDelay =
+ splitContext.wasLastPollEmpty
+ ? getRecordsIdlePollingTimeBetweenEmptyPolls.toMillis()
+ :
getRecordsIdlePollingTimeBetweenNonEmptyPolls.toMillis();
+
+ return splitContext.lastPollTimeMillis + requiredDelay;
}
@Override
public RecordsWithSplitIds<Record> fetch() throws IOException {
- DynamoDbStreamsShardSplitState splitState = assignedSplits.poll();
- if (splitState == null) {
+ if (assignedSplits.isEmpty()) {
+ return INCOMPLETE_SHARD_EMPTY_RECORDS;
+ }
+ DynamoDbStreamsShardSplitWithContext splitContext =
assignedSplits.poll();
+
+ if (pausedSplitIds.contains(splitContext.splitState.getSplitId())) {
+ assignedSplits.add(splitContext);
return INCOMPLETE_SHARD_EMPTY_RECORDS;
}
- if (pausedSplitIds.contains(splitState.getSplitId())) {
- assignedSplits.add(splitState);
+ // Check if split is paused or not ready due to empty poll delay
Review Comment:
This comment doesn't seem appropriate (since pause is checked above)
##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java:
##########
@@ -320,10 +336,103 @@ record ->
for (int i = 0; i < 10; i++) {
RecordsWithSplitIds<Record> records = splitReader.fetch();
fetchedRecords.addAll(readAllRecords(records));
+ Thread.sleep(NON_EMPTY_POLLING_DELAY_MILLIS.toMillis());
}
assertThat(fetchedRecords).containsExactly(recordsFromSplit3.toArray(new
Record[0]));
}
+ @Test
+ void testPollingDelayForEmptyRecords() throws Exception {
+ // Given assigned split with no records
+ testStreamProxy.addShards(TEST_SHARD_ID);
+ splitReader.handleSplitsChanges(
+ new
SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
+
+ // First poll - should return empty records
+ RecordsWithSplitIds<Record> firstPoll = splitReader.fetch();
+ assertThat(firstPoll.nextRecordFromSplit()).isNull();
+ assertThat(firstPoll.nextSplit()).isNull();
+ assertThat(firstPoll.finishedSplits()).isEmpty();
+
+ // Immediate second poll - should return empty due to polling delay
+ RecordsWithSplitIds<Record> secondPoll = splitReader.fetch();
+ assertThat(secondPoll.nextRecordFromSplit()).isNull();
+ assertThat(secondPoll.nextSplit()).isNull();
+ assertThat(secondPoll.finishedSplits()).isEmpty();
+ }
+
+ @Test
+ void testLessPollingDelayForNonEmptyRecords() throws Exception {
+ // Given assigned split with records
+ testStreamProxy.addShards(TEST_SHARD_ID);
+ Record record1 = getTestRecord("data-1");
+ Record record2 = getTestRecord("data-2");
+
+ testStreamProxy.addRecords(
+ TestUtil.STREAM_ARN, TEST_SHARD_ID,
Collections.singletonList(record1));
+
+ splitReader.handleSplitsChanges(
+ new
SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
+
+ Thread.sleep(NON_EMPTY_POLLING_DELAY_MILLIS.toMillis());
+ // First poll - should return record1
+ RecordsWithSplitIds<Record> firstPoll = splitReader.fetch();
+ assertThat(readAllRecords(firstPoll)).containsExactly(record1);
+
+ // Add second record
+ testStreamProxy.addRecords(
+ TestUtil.STREAM_ARN, TEST_SHARD_ID,
Collections.singletonList(record2));
+ Thread.sleep(NON_EMPTY_POLLING_DELAY_MILLIS.toMillis());
Review Comment:
Can we use Awaitility instead of Thread.sleep?
https://github.com/awaitility/awaitility
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java:
##########
@@ -57,48 +60,89 @@ public class PollingDynamoDbStreamsShardSplitReader
new DynamoDbStreamRecordsWithSplitIds(Collections.emptyIterator(),
null, false);
private final StreamProxy dynamodbStreams;
+ private final Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls;
+ private final Duration getRecordsIdlePollingTimeBetweenEmptyPolls;
- private final Deque<DynamoDbStreamsShardSplitState> assignedSplits = new
ArrayDeque<>();
+ private final Deque<DynamoDbStreamsShardSplitWithContext> assignedSplits;
private final Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap;
- private final Set<String> pausedSplitIds = new HashSet<>();
+ private final Set<String> pausedSplitIds;
+ private static final Logger LOG =
+
LoggerFactory.getLogger(PollingDynamoDbStreamsShardSplitReader.class);
public PollingDynamoDbStreamsShardSplitReader(
StreamProxy dynamodbStreamsProxy,
+ Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls,
+ Duration getRecordsIdlePollingTimeBetweenEmptyPolls,
Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap) {
this.dynamodbStreams = dynamodbStreamsProxy;
+ this.getRecordsIdlePollingTimeBetweenNonEmptyPolls =
+ getRecordsIdlePollingTimeBetweenNonEmptyPolls;
+ this.getRecordsIdlePollingTimeBetweenEmptyPolls =
+ getRecordsIdlePollingTimeBetweenEmptyPolls;
this.shardMetricGroupMap = shardMetricGroupMap;
+ this.assignedSplits = new ArrayDeque<>();
+ this.pausedSplitIds = new HashSet<>();
+ }
+
+ private long getNextEligibleTime(DynamoDbStreamsShardSplitWithContext
splitContext) {
+ long requiredDelay =
+ splitContext.wasLastPollEmpty
+ ? getRecordsIdlePollingTimeBetweenEmptyPolls.toMillis()
+ :
getRecordsIdlePollingTimeBetweenNonEmptyPolls.toMillis();
+
+ return splitContext.lastPollTimeMillis + requiredDelay;
}
@Override
public RecordsWithSplitIds<Record> fetch() throws IOException {
- DynamoDbStreamsShardSplitState splitState = assignedSplits.poll();
- if (splitState == null) {
+ if (assignedSplits.isEmpty()) {
+ return INCOMPLETE_SHARD_EMPTY_RECORDS;
+ }
+ DynamoDbStreamsShardSplitWithContext splitContext =
assignedSplits.poll();
+
+ if (pausedSplitIds.contains(splitContext.splitState.getSplitId())) {
+ assignedSplits.add(splitContext);
return INCOMPLETE_SHARD_EMPTY_RECORDS;
}
- if (pausedSplitIds.contains(splitState.getSplitId())) {
- assignedSplits.add(splitState);
+ // Check if split is paused or not ready due to empty poll delay
+ long currentTime = System.currentTimeMillis();
+ long nextEligibleTime = getNextEligibleTime(splitContext);
+
+ LOG.debug(
+ "Polling split: {}, currentTime: {}, eligibleTime: {},
wasEmptyPoll: {}",
+ splitContext.splitState.getSplitId(),
+ currentTime,
+ nextEligibleTime,
+ splitContext.wasLastPollEmpty);
+ if (nextEligibleTime > currentTime) {
+ assignedSplits.add(splitContext);
+ sleep(1);
Review Comment:
Does this actually help with CPU 100%?
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java:
##########
@@ -76,6 +76,21 @@ public enum InitialPosition {
public static final String BASE_DDB_STREAMS_USER_AGENT_PREFIX_FORMAT =
"Apache Flink %s (%s) DynamoDb Streams Connector";
+ public static final ConfigOption<Duration>
+ DYNAMODB_STREAMS_GET_RECORDS_IDLE_TIME_BETWEEN_EMPTY_POLLS =
+
ConfigOptions.key("flink.dynamodbstreams.getrecords.empty.mindelay")
+ .durationType()
+ .defaultValue(Duration.ofMillis(1000))
+ .withDescription(
+ "The idle time between empty polls for
DynamoDB Streams GetRecords API");
+ public static final ConfigOption<Duration>
+ DYNAMODB_STREAMS_GET_RECORDS_IDLE_TIME_BETWEEN_NON_EMPTY_POLLS =
+
ConfigOptions.key("flink.dynamodbstreams.getrecords.nonempty.mindelay")
+ .durationType()
+ .defaultValue(Duration.ofMillis(250))
Review Comment:
Why not make this default to 0?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]