gguptp commented on code in PR #151:
URL:
https://github.com/apache/flink-connector-aws/pull/151#discussion_r1741931311
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -116,24 +118,96 @@ public void handleSplitRequest(int subtaskId, @Nullable
String requesterHostname
}
@Override
- public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int
subtaskId) {
- if (!splitAssignment.containsKey(subtaskId)) {
- LOG.warn(
- "Unable to add splits back for subtask {} since it is not
assigned any splits. Splits: {}",
- subtaskId,
- splits);
+ public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) {
+ throw new UnsupportedOperationException("Partial recovery is not
supported");
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ if (sourceEvent instanceof SplitsFinishedEvent) {
+ handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent);
+ }
+ }
+
+ private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent
splitsFinishedEvent) {
+ splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+ splitAssignment
+ .get(subtaskId)
+ .removeIf(
+ split ->
+ splitsFinishedEvent
+ .getFinishedSplitIds()
+ .contains(split.splitId()));
+ assignSplits();
+ }
+
+ private void processDiscoveredSplits(ListShardsResult discoveredSplits,
Throwable throwable) {
+ if (throwable != null) {
+ throw new DynamoDbStreamsSourceException("Failed to list shards.",
throwable);
+ }
+
+ SplitGraphInconsistencyTracker splitGraphInconsistencyTracker =
+ trackSplitsAndResolveInconsistencies(discoveredSplits);
+
+ if (splitGraphInconsistencyTracker.inconsistencyDetected()) {
+ LOG.error(
+ "There are inconsistencies in DescribeStream which we were
not able to resolve. First leaf node on which inconsistency was detected:"
+ +
splitGraphInconsistencyTracker.getEarliestClosedLeafNode());
+ return;
+ }
+
+ splitTracker.addSplits(splitGraphInconsistencyTracker.getNodes());
+ splitTracker.removeSplits(
+ splitGraphInconsistencyTracker.getNodes().stream()
+ .map(Shard::shardId)
+ .collect(Collectors.toSet()));
+ if (context.registeredReaders().size() < context.currentParallelism())
{
+ LOG.info(
+ "Insufficient registered readers, skipping assignment of
discovered splits until all readers are registered. Required number of readers:
{}, registered readers: {}",
+ context.currentParallelism(),
+ context.registeredReaders().size());
return;
}
+ assignSplits();
+ }
- for (DynamoDbStreamsShardSplit split : splits) {
- splitAssignment.get(subtaskId).remove(split);
- assignedSplitIds.remove(split.splitId());
- unassignedSplits.add(split);
+ /**
+ * This method tracks the discovered splits in a graph and if the graph
has inconsistencies, it
+ * tries to resolve them using DescribeStream calls using the first
inconsistent node found in
+ * the split graph.
+ *
+ * @param discoveredSplits splits discovered after calling DescribeStream
at the start of the
+ * application or periodically.
+ */
+ private SplitGraphInconsistencyTracker
trackSplitsAndResolveInconsistencies(
+ ListShardsResult discoveredSplits) {
Review Comment:
yeah, agreed, moving this to discoverSplits()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]