gguptp commented on code in PR #151:
URL:
https://github.com/apache/flink-connector-aws/pull/151#discussion_r1698925649
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -116,24 +117,76 @@ public void handleSplitRequest(int subtaskId, @Nullable
String requesterHostname
}
@Override
- public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int
subtaskId) {
- if (!splitAssignment.containsKey(subtaskId)) {
- LOG.warn(
- "Unable to add splits back for subtask {} since it is not
assigned any splits. Splits: {}",
- subtaskId,
- splits);
- return;
+ public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) {
+ throw new UnsupportedOperationException("Partial recovery is not
supported");
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ if (sourceEvent instanceof SplitsFinishedEvent) {
+ handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent);
+ }
+ }
+
+ private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent
splitsFinishedEvent) {
+ splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+ splitAssignment
+ .get(subtaskId)
+ .removeIf(
+ split ->
+ splitsFinishedEvent
+ .getFinishedSplitIds()
+ .contains(split.splitId()));
+ assignSplits();
+ }
+
+ private void processDiscoveredSplits(List<Shard> discoveredSplits,
Throwable throwable) {
+ if (throwable != null) {
+ throw new DynamoDbStreamsSourceException("Failed to list shards.",
throwable);
+ }
+
+ ShardGraphTracker shardGraphTracker = new ShardGraphTracker();
+ shardGraphTracker.addNodes(discoveredSplits);
+
+ StreamDescription streamDescription =
streamProxy.getStreamDescription(streamArn, null);
+ boolean streamDisabled =
streamDescription.streamStatus().equals(StreamStatus.DISABLED);
+
+ for (int i = 0;
+ i <
sourceConfig.getInteger(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT)
+ && !streamDisabled
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]