hlteoh37 commented on code in PR #151:
URL:
https://github.com/apache/flink-connector-aws/pull/151#discussion_r1741650802
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -67,12 +74,10 @@ public class DynamoDbStreamsSourceEnumerator
private final StreamProxy streamProxy;
private final DynamoDbStreamsShardAssigner shardAssigner;
private final ShardAssignerContext shardAssignerContext;
+ private final SplitTracker splitTracker;
+ private boolean hasInitialState;
Review Comment:
Let's make this `final`
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java:
##########
@@ -68,6 +77,14 @@ public StartingPosition getStartingPosition() {
return startingPosition;
}
+ public Set<String> getParentShardIds() {
+ return parentShardIds;
+ }
+
+ public Instant getShardCreationTime() {
+ return Instant.ofEpochMilli(Long.parseLong(shardId.split("-")[1]));
+ }
Review Comment:
This is not used. Let's remove it
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -116,24 +118,96 @@ public void handleSplitRequest(int subtaskId, @Nullable
String requesterHostname
}
@Override
- public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int
subtaskId) {
- if (!splitAssignment.containsKey(subtaskId)) {
- LOG.warn(
- "Unable to add splits back for subtask {} since it is not
assigned any splits. Splits: {}",
- subtaskId,
- splits);
+ public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) {
+ throw new UnsupportedOperationException("Partial recovery is not
supported");
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ if (sourceEvent instanceof SplitsFinishedEvent) {
+ handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent);
+ }
+ }
+
+ private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent
splitsFinishedEvent) {
+ splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+ splitAssignment
+ .get(subtaskId)
+ .removeIf(
+ split ->
+ splitsFinishedEvent
+ .getFinishedSplitIds()
+ .contains(split.splitId()));
+ assignSplits();
+ }
+
+ private void processDiscoveredSplits(ListShardsResult discoveredSplits,
Throwable throwable) {
+ if (throwable != null) {
+ throw new DynamoDbStreamsSourceException("Failed to list shards.",
throwable);
+ }
+
+ SplitGraphInconsistencyTracker splitGraphInconsistencyTracker =
+ trackSplitsAndResolveInconsistencies(discoveredSplits);
+
+ if (splitGraphInconsistencyTracker.inconsistencyDetected()) {
+ LOG.error(
+ "There are inconsistencies in DescribeStream which we were
not able to resolve. First leaf node on which inconsistency was detected:"
+ +
splitGraphInconsistencyTracker.getEarliestClosedLeafNode());
+ return;
+ }
+
+ splitTracker.addSplits(splitGraphInconsistencyTracker.getNodes());
+ splitTracker.removeSplits(
+ splitGraphInconsistencyTracker.getNodes().stream()
+ .map(Shard::shardId)
+ .collect(Collectors.toSet()));
Review Comment:
Makes sense - maybe we can rename it like `cleanupFinishedSplits` or
something along those lines for clarity!
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator.tracker;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants;
+import
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
+import
org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus;
+import
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus;
+import
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+import org.apache.flink.connector.dynamodb.source.util.ShardUtils;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON;
+import static
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED;
+import static
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED;
+
+/**
+ * This class is used to track splits and will be used to assign any
unassigned splits. It also
+ * ensures that the parent-child shard ordering is maintained.
+ */
+@Internal
+public class SplitTracker {
+ private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new
ConcurrentHashMap<>();
+ private final Set<String> assignedSplits = new HashSet<>();
+ private final Set<String> finishedSplits = new HashSet<>();
+ private final String streamArn;
+ private final InitialPosition initialPosition;
+
+ public SplitTracker(String streamArn, InitialPosition initialPosition) {
+ this(Collections.emptyList(), streamArn, initialPosition);
+ }
+
+ public SplitTracker(
+ List<DynamoDBStreamsShardSplitWithAssignmentStatus> initialState,
+ String streamArn,
+ DynamodbStreamsSourceConfigConstants.InitialPosition
initialPosition) {
+ this.streamArn = streamArn;
+ this.initialPosition = initialPosition;
+ initialState.forEach(
+ splitWithStatus -> {
+ DynamoDbStreamsShardSplit currentSplit =
splitWithStatus.split();
+ knownSplits.put(currentSplit.splitId(), currentSplit);
+
+ if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) {
+ assignedSplits.add(splitWithStatus.split().splitId());
+ }
+ if (FINISHED.equals(splitWithStatus.assignmentStatus())) {
+ finishedSplits.add(splitWithStatus.split().splitId());
+ }
+ });
+ }
+
+ /**
+ * Add newly discovered splits to tracker.
+ *
+ * @param shardsToAdd collection of splits to add to tracking
+ */
+ public void addSplits(Collection<Shard> shardsToAdd) {
+ for (Shard shard : shardsToAdd) {
+ String shardId = shard.shardId();
+ if (!knownSplits.containsKey(shardId)) {
+ DynamoDbStreamsShardSplit newSplit = mapToSplit(shard,
getStartingPosition(shard));
+ knownSplits.put(shardId, newSplit);
+ }
+ }
+ }
+
+ private InitialPosition getStartingPosition(Shard shard) {
+ if (shard.parentShardId() == null) {
+ return initialPosition;
+ }
+ if (!knownSplits.containsKey(shard.parentShardId())) {
+ return initialPosition;
+ }
+ return TRIM_HORIZON;
+ }
+
+ private DynamoDbStreamsShardSplit mapToSplit(
+ Shard shard, DynamodbStreamsSourceConfigConstants.InitialPosition
initialPosition) {
+ StartingPosition startingPosition;
+ switch (initialPosition) {
+ case LATEST:
+ startingPosition = StartingPosition.latest();
+ break;
+ case TRIM_HORIZON:
+ default:
+ startingPosition = StartingPosition.fromStart();
+ }
+
+ Set<String> parentShardIds = new HashSet<>();
+ if (shard.parentShardId() != null) {
+ parentShardIds.add(shard.parentShardId());
+ }
+ return new DynamoDbStreamsShardSplit(
+ streamArn, shard.shardId(), startingPosition, parentShardIds);
Review Comment:
If we know that DDB shards only have 1 parent, maybe let's remove the `Set`
and instead just pass the parentShardId directly into the
`DynamoDbStreamsShardSplit`?
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator.tracker;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants;
+import
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
+import
org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus;
+import
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus;
+import
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+import org.apache.flink.connector.dynamodb.source.util.ShardUtils;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON;
+import static
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED;
+import static
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED;
+
+/**
+ * This class is used to track splits and will be used to assign any
unassigned splits. It also
+ * ensures that the parent-child shard ordering is maintained.
+ */
+@Internal
+public class SplitTracker {
+ private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new
ConcurrentHashMap<>();
+ private final Set<String> assignedSplits = new HashSet<>();
+ private final Set<String> finishedSplits = new HashSet<>();
+ private final String streamArn;
+ private final InitialPosition initialPosition;
+
+ public SplitTracker(String streamArn, InitialPosition initialPosition) {
+ this(Collections.emptyList(), streamArn, initialPosition);
+ }
+
+ public SplitTracker(
+ List<DynamoDBStreamsShardSplitWithAssignmentStatus> initialState,
+ String streamArn,
+ DynamodbStreamsSourceConfigConstants.InitialPosition
initialPosition) {
+ this.streamArn = streamArn;
+ this.initialPosition = initialPosition;
+ initialState.forEach(
+ splitWithStatus -> {
+ DynamoDbStreamsShardSplit currentSplit =
splitWithStatus.split();
+ knownSplits.put(currentSplit.splitId(), currentSplit);
+
+ if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) {
+ assignedSplits.add(splitWithStatus.split().splitId());
+ }
+ if (FINISHED.equals(splitWithStatus.assignmentStatus())) {
+ finishedSplits.add(splitWithStatus.split().splitId());
+ }
+ });
+ }
+
+ /**
+ * Add newly discovered splits to tracker.
+ *
+ * @param shardsToAdd collection of splits to add to tracking
+ */
+ public void addSplits(Collection<Shard> shardsToAdd) {
+ for (Shard shard : shardsToAdd) {
+ String shardId = shard.shardId();
+ if (!knownSplits.containsKey(shardId)) {
+ DynamoDbStreamsShardSplit newSplit = mapToSplit(shard,
getStartingPosition(shard));
+ knownSplits.put(shardId, newSplit);
+ }
+ }
+ }
+
+ private InitialPosition getStartingPosition(Shard shard) {
+ if (shard.parentShardId() == null) {
+ return initialPosition;
+ }
+ if (!knownSplits.containsKey(shard.parentShardId())) {
+ return initialPosition;
+ }
+ return TRIM_HORIZON;
+ }
+
+ private DynamoDbStreamsShardSplit mapToSplit(
+ Shard shard, DynamodbStreamsSourceConfigConstants.InitialPosition
initialPosition) {
+ StartingPosition startingPosition;
+ switch (initialPosition) {
+ case LATEST:
+ startingPosition = StartingPosition.latest();
+ break;
+ case TRIM_HORIZON:
+ default:
+ startingPosition = StartingPosition.fromStart();
+ }
+
+ Set<String> parentShardIds = new HashSet<>();
+ if (shard.parentShardId() != null) {
+ parentShardIds.add(shard.parentShardId());
+ }
+ return new DynamoDbStreamsShardSplit(
+ streamArn, shard.shardId(), startingPosition, parentShardIds);
+ }
+
+ /**
+ * Mark splits as assigned. Assigned splits will no longer be returned as
pending splits.
+ *
+ * @param splitsToAssign collection of splits to mark as assigned
+ */
+ public void markAsAssigned(Collection<DynamoDbStreamsShardSplit>
splitsToAssign) {
+ splitsToAssign.forEach(split -> assignedSplits.add(split.splitId()));
+ }
+
+ /**
+ * Mark splits as finished. Assigned splits will no longer be returned as
pending splits.
Review Comment:
nit: this should be `Finished`
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java:
##########
@@ -40,9 +42,41 @@ public enum InitialPosition {
public static final ConfigOption<Long> SHARD_DISCOVERY_INTERVAL_MILLIS =
ConfigOptions.key("flink.shard.discovery.intervalmillis")
.longType()
- .defaultValue(10000L)
+ .defaultValue(60000L)
.withDescription("The interval between each attempt to
discover new shards.");
Review Comment:
Duration is better here! Because it allows users to specify in other units
(e.g. `1 min` or `10 s` and it will convert to millis)
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -162,75 +216,14 @@ private List<DynamoDbStreamsShardSplit>
initialDiscoverSplits() {
*
* @return list of discovered splits
*/
- private List<DynamoDbStreamsShardSplit> periodicallyDiscoverSplits() {
- List<Shard> shards = streamProxy.listShards(streamArn,
lastSeenShardId);
-
- // Any shard discovered after the initial startup should be read from
the start, since they
- // come from resharding
- return mapToSplits(shards, InitialPosition.TRIM_HORIZON);
- }
-
- private List<DynamoDbStreamsShardSplit> mapToSplits(
- List<Shard> shards, InitialPosition initialPosition) {
- StartingPosition startingPosition;
- switch (initialPosition) {
- case LATEST:
- startingPosition = StartingPosition.latest();
- break;
- case TRIM_HORIZON:
- default:
- startingPosition = StartingPosition.fromStart();
- }
-
- List<DynamoDbStreamsShardSplit> splits = new ArrayList<>();
- for (Shard shard : shards) {
- splits.add(new DynamoDbStreamsShardSplit(streamArn,
shard.shardId(), startingPosition));
- }
-
- return splits;
- }
-
- /**
- * This method assigns a given set of DynamoDb Streams splits to the
readers currently
- * registered on the cluster. This assignment is done via a side-effect on
the {@link
- * SplitEnumeratorContext} object.
- *
- * @param discoveredSplits list of discovered splits
- * @param throwable thrown when discovering splits. Will be null if no
throwable thrown.
- */
- private void assignSplits(
- List<DynamoDbStreamsShardSplit> discoveredSplits, Throwable
throwable) {
- if (throwable != null) {
- throw new DynamoDbStreamsSourceException("Failed to list shards.",
throwable);
- }
-
- if (context.registeredReaders().size() < context.currentParallelism())
{
- LOG.info(
- "Insufficient registered readers, skipping assignment of
discovered splits until all readers are registered. Required number of readers:
{}, Registered readers: {}",
- context.currentParallelism(),
- context.registeredReaders().size());
- unassignedSplits.addAll(discoveredSplits);
- return;
- }
-
- Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments =
new HashMap<>();
- for (DynamoDbStreamsShardSplit split : unassignedSplits) {
- assignSplitToSubtask(split, newSplitAssignments);
- }
- unassignedSplits.clear();
- for (DynamoDbStreamsShardSplit split : discoveredSplits) {
- assignSplitToSubtask(split, newSplitAssignments);
- }
-
- updateLastSeenShardId(discoveredSplits);
- updateSplitAssignment(newSplitAssignments);
- context.assignSplits(new SplitsAssignment<>(newSplitAssignments));
+ private List<Shard> periodicallyDiscoverSplits() {
+ return streamProxy.listShards(streamArn, null);
}
private void assignSplitToSubtask(
DynamoDbStreamsShardSplit split,
Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments)
{
- if (assignedSplitIds.contains(split.splitId())) {
+ if (splitTracker.isAssigned(split.splitId())) {
LOG.info(
"Skipping assignment of shard {} from stream {} because it
is already assigned.",
Review Comment:
I think this should be `WARN` actually - it indicates an issue with user's
`ShardAssigner` / Enumerator mechanism
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator.tracker;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants;
+import
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
+import
org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus;
+import
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus;
+import
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+import org.apache.flink.connector.dynamodb.source.util.ShardUtils;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON;
+import static
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED;
+import static
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED;
+
+/**
+ * This class is used to track splits and will be used to assign any
unassigned splits. It also
+ * ensures that the parent-child shard ordering is maintained.
+ */
+@Internal
+public class SplitTracker {
+ private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new
ConcurrentHashMap<>();
+ private final Set<String> assignedSplits = new HashSet<>();
+ private final Set<String> finishedSplits = new HashSet<>();
+ private final String streamArn;
+ private final InitialPosition initialPosition;
+
+ public SplitTracker(String streamArn, InitialPosition initialPosition) {
+ this(Collections.emptyList(), streamArn, initialPosition);
+ }
+
+ public SplitTracker(
+ List<DynamoDBStreamsShardSplitWithAssignmentStatus> initialState,
+ String streamArn,
+ DynamodbStreamsSourceConfigConstants.InitialPosition
initialPosition) {
+ this.streamArn = streamArn;
+ this.initialPosition = initialPosition;
+ initialState.forEach(
+ splitWithStatus -> {
+ DynamoDbStreamsShardSplit currentSplit =
splitWithStatus.split();
+ knownSplits.put(currentSplit.splitId(), currentSplit);
+
+ if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) {
+ assignedSplits.add(splitWithStatus.split().splitId());
+ }
+ if (FINISHED.equals(splitWithStatus.assignmentStatus())) {
+ finishedSplits.add(splitWithStatus.split().splitId());
+ }
+ });
+ }
+
+ /**
+ * Add newly discovered splits to tracker.
+ *
+ * @param shardsToAdd collection of splits to add to tracking
+ */
+ public void addSplits(Collection<Shard> shardsToAdd) {
+ for (Shard shard : shardsToAdd) {
+ String shardId = shard.shardId();
+ if (!knownSplits.containsKey(shardId)) {
+ DynamoDbStreamsShardSplit newSplit = mapToSplit(shard,
getStartingPosition(shard));
+ knownSplits.put(shardId, newSplit);
+ }
+ }
+ }
+
+ private InitialPosition getStartingPosition(Shard shard) {
+ if (shard.parentShardId() == null) {
+ return initialPosition;
+ }
+ if (!knownSplits.containsKey(shard.parentShardId())) {
+ return initialPosition;
+ }
+ return TRIM_HORIZON;
Review Comment:
Sorry to harp on this again, but I don't really get this logic. Why do we
return `initialPosition` (which can be `TRIM_HORIZON` or `LATEST` if there is
no `parentShard`? Wouldn't this mean that if a customer starts from `LATEST` in
that case, wouldn't it also read from the `TRIM_HORIZON` shard?
Wonder if we can simplify the logic for `shardDiscovery` and
`startingPosition`, depending on `initialPosition`.
```
if (initialPosition == TRIM_HORIZON) {
discoverAllShards()
startingPosition == TRIM_HORIZON
} else if (initialPosition == LATEST) {
# Initial discovery
discoverAllShards()
if shard is open,
consume from shard
startingPosition == LATEST
if shard is closed,
mark shard as FINISHED
# Periodic discovery
discoverAllShards()
if shard already known, skip
if shard is not known, startingPosition == LATEST
}
```
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -87,25 +92,29 @@ public DynamoDbStreamsSourceEnumerator(
this.streamProxy = streamProxy;
this.shardAssigner = shardAssigner;
this.shardAssignerContext = new ShardAssignerContext(splitAssignment,
context);
+ InitialPosition initialPosition =
sourceConfig.get(STREAM_INITIAL_POSITION);
+ this.hasInitialState = state != null;
if (state == null) {
- this.lastSeenShardId = null;
- this.unassignedSplits = new HashSet<>();
+ this.splitTracker = new SplitTracker(streamArn, initialPosition);
} else {
- this.lastSeenShardId = state.getLastSeenShardId();
- this.unassignedSplits = state.getUnassignedSplits();
+ this.splitTracker =
+ new SplitTracker(state.getKnownSplits(), streamArn,
initialPosition);
}
}
@Override
public void start() {
- if (lastSeenShardId == null) {
- context.callAsync(this::initialDiscoverSplits, this::assignSplits);
+ // if there is already a saved state, we can just start with that
state instead of waiting
+ // for shard discovery to complete
+ if (!hasInitialState) {
+ context.callAsync(this::discoverSplits,
this::processDiscoveredSplits);
Review Comment:
nit: This is a nice thought - but do we actually expect there to be shards
in unassigned?
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -116,24 +118,96 @@ public void handleSplitRequest(int subtaskId, @Nullable
String requesterHostname
}
@Override
- public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int
subtaskId) {
- if (!splitAssignment.containsKey(subtaskId)) {
- LOG.warn(
- "Unable to add splits back for subtask {} since it is not
assigned any splits. Splits: {}",
- subtaskId,
- splits);
+ public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) {
+ throw new UnsupportedOperationException("Partial recovery is not
supported");
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ if (sourceEvent instanceof SplitsFinishedEvent) {
+ handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent);
+ }
+ }
+
+ private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent
splitsFinishedEvent) {
+ splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+ splitAssignment
+ .get(subtaskId)
+ .removeIf(
+ split ->
+ splitsFinishedEvent
+ .getFinishedSplitIds()
+ .contains(split.splitId()));
+ assignSplits();
+ }
+
+ private void processDiscoveredSplits(ListShardsResult discoveredSplits,
Throwable throwable) {
+ if (throwable != null) {
+ throw new DynamoDbStreamsSourceException("Failed to list shards.",
throwable);
+ }
+
+ SplitGraphInconsistencyTracker splitGraphInconsistencyTracker =
+ trackSplitsAndResolveInconsistencies(discoveredSplits);
+
+ if (splitGraphInconsistencyTracker.inconsistencyDetected()) {
+ LOG.error(
+ "There are inconsistencies in DescribeStream which we were
not able to resolve. First leaf node on which inconsistency was detected:"
+ +
splitGraphInconsistencyTracker.getEarliestClosedLeafNode());
+ return;
+ }
+
+ splitTracker.addSplits(splitGraphInconsistencyTracker.getNodes());
+ splitTracker.removeSplits(
+ splitGraphInconsistencyTracker.getNodes().stream()
+ .map(Shard::shardId)
+ .collect(Collectors.toSet()));
+ if (context.registeredReaders().size() < context.currentParallelism())
{
+ LOG.info(
+ "Insufficient registered readers, skipping assignment of
discovered splits until all readers are registered. Required number of readers:
{}, registered readers: {}",
+ context.currentParallelism(),
+ context.registeredReaders().size());
return;
}
+ assignSplits();
+ }
- for (DynamoDbStreamsShardSplit split : splits) {
- splitAssignment.get(subtaskId).remove(split);
- assignedSplitIds.remove(split.splitId());
- unassignedSplits.add(split);
+ /**
+ * This method tracks the discovered splits in a graph and if the graph
has inconsistencies, it
+ * tries to resolve them using DescribeStream calls using the first
inconsistent node found in
+ * the split graph.
+ *
+ * @param discoveredSplits splits discovered after calling DescribeStream
at the start of the
+ * application or periodically.
+ */
+ private SplitGraphInconsistencyTracker
trackSplitsAndResolveInconsistencies(
+ ListShardsResult discoveredSplits) {
Review Comment:
Discussed offline - would be good for us to move it to `discoverSplits()` as
otherwise it will block
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -116,24 +118,96 @@ public void handleSplitRequest(int subtaskId, @Nullable
String requesterHostname
}
@Override
- public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int
subtaskId) {
- if (!splitAssignment.containsKey(subtaskId)) {
- LOG.warn(
- "Unable to add splits back for subtask {} since it is not
assigned any splits. Splits: {}",
- subtaskId,
- splits);
+ public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) {
+ throw new UnsupportedOperationException("Partial recovery is not
supported");
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ if (sourceEvent instanceof SplitsFinishedEvent) {
+ handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent);
+ }
+ }
+
+ private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent
splitsFinishedEvent) {
+ splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+ splitAssignment
+ .get(subtaskId)
+ .removeIf(
+ split ->
+ splitsFinishedEvent
+ .getFinishedSplitIds()
+ .contains(split.splitId()));
+ assignSplits();
+ }
+
+ private void processDiscoveredSplits(ListShardsResult discoveredSplits,
Throwable throwable) {
+ if (throwable != null) {
+ throw new DynamoDbStreamsSourceException("Failed to list shards.",
throwable);
+ }
+
+ SplitGraphInconsistencyTracker splitGraphInconsistencyTracker =
+ trackSplitsAndResolveInconsistencies(discoveredSplits);
+
+ if (splitGraphInconsistencyTracker.inconsistencyDetected()) {
+ LOG.error(
+ "There are inconsistencies in DescribeStream which we were
not able to resolve. First leaf node on which inconsistency was detected:"
+ +
splitGraphInconsistencyTracker.getEarliestClosedLeafNode());
+ return;
+ }
+
+ splitTracker.addSplits(splitGraphInconsistencyTracker.getNodes());
+ splitTracker.removeSplits(
+ splitGraphInconsistencyTracker.getNodes().stream()
+ .map(Shard::shardId)
+ .collect(Collectors.toSet()));
+ if (context.registeredReaders().size() < context.currentParallelism())
{
+ LOG.info(
+ "Insufficient registered readers, skipping assignment of
discovered splits until all readers are registered. Required number of readers:
{}, registered readers: {}",
+ context.currentParallelism(),
+ context.registeredReaders().size());
return;
}
+ assignSplits();
+ }
- for (DynamoDbStreamsShardSplit split : splits) {
- splitAssignment.get(subtaskId).remove(split);
- assignedSplitIds.remove(split.splitId());
- unassignedSplits.add(split);
+ /**
+ * This method tracks the discovered splits in a graph and if the graph
has inconsistencies, it
+ * tries to resolve them using DescribeStream calls using the first
inconsistent node found in
+ * the split graph.
+ *
+ * @param discoveredSplits splits discovered after calling DescribeStream
at the start of the
+ * application or periodically.
+ */
+ private SplitGraphInconsistencyTracker
trackSplitsAndResolveInconsistencies(
+ ListShardsResult discoveredSplits) {
+ SplitGraphInconsistencyTracker splitGraphInconsistencyTracker =
+ new SplitGraphInconsistencyTracker();
+ splitGraphInconsistencyTracker.addNodes(discoveredSplits.getShards());
+
+ boolean streamDisabled =
discoveredSplits.getStreamStatus().equals(StreamStatus.DISABLED);
+ int describeStreamInconsistencyResolutionCount =
+
sourceConfig.get(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT);
+ for (int i = 0;
+ i < describeStreamInconsistencyResolutionCount
+ && !streamDisabled
Review Comment:
Ok, that makes sense - can we add a comment for this please!
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator.tracker;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants;
+import
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
+import
org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus;
+import
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus;
+import
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON;
+import static
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED;
+import static
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED;
+
+/** This class is used to track splits and will be used to assign any
unassigned splits. */
+@Internal
+public class SplitTracker {
+ private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new
ConcurrentHashMap<>();
+ private final Set<String> assignedSplits = new HashSet<>();
+ private final Set<String> finishedSplits = new HashSet<>();
+ private final String streamArn;
+ private final InitialPosition initialPosition;
+
+ public SplitTracker(String streamArn, InitialPosition initialPosition) {
+ this(Collections.emptyList(), streamArn, initialPosition);
+ }
+
+ public SplitTracker(
+ List<DynamoDBStreamsShardSplitWithAssignmentStatus> initialState,
+ String streamArn,
+ DynamodbStreamsSourceConfigConstants.InitialPosition
initialPosition) {
+ this.streamArn = streamArn;
+ this.initialPosition = initialPosition;
+ initialState.forEach(
+ splitWithStatus -> {
+ DynamoDbStreamsShardSplit currentSplit =
splitWithStatus.split();
+ knownSplits.put(currentSplit.splitId(), currentSplit);
+
+ if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) {
+ assignedSplits.add(splitWithStatus.split().splitId());
+ }
+ if (FINISHED.equals(splitWithStatus.assignmentStatus())) {
+ finishedSplits.add(splitWithStatus.split().splitId());
+ }
+ });
+ }
+
+ /**
+ * Add newly discovered splits to tracker.
+ *
+ * @param shardsToAdd collection of splits to add to tracking
+ */
+ public void addSplits(Collection<Shard> shardsToAdd) {
+ List<DynamoDbStreamsShardSplit> newSplitsToAdd =
+ determineNewShardsToBePutForAssignment(shardsToAdd);
+ newSplitsToAdd.forEach(
+ split -> {
+ knownSplits.put(split.splitId(), split);
+ });
+ }
+
+ private List<Shard> getOpenShards(Collection<Shard> shards) {
+ List<Shard> openShards = new ArrayList<>();
+ for (Shard shard : shards) {
+ if (shard.sequenceNumberRange().endingSequenceNumber() == null) {
+ openShards.add(shard);
+ }
+ }
+ return openShards;
+ }
+
+ private Map<String, Shard> getShardIdToShardMap(Collection<Shard> shards) {
+ Map<String, Shard> shardIdToShardMap = new HashMap<>();
+ for (Shard shard : shards) {
+ shardIdToShardMap.put(shard.shardId(), shard);
+ }
+ return shardIdToShardMap;
+ }
+
+ /**
+ * This function finds the open shards returned from describeStream
operation and adds them
+ * along with their parents if parents are not already tracked to the
tracked splits
+ *
+ * <p>This is needed because describestream has an inconsistency where for
example if a shard s
+ * is split into s1 and s2, in one describestream operation, its possible
that only one of s1
+ * and s2 is returned.
+ *
+ * <p>We will go up the shard lineage until we find a parent shard which
is not yet tracked by
+ * SplitTracker If no ancestor is tracked, the first ancestor will be read
from the initial
+ * position configured and all its descendants will be read from
TRIM_HORIZON
+ *
+ * @param shards the shards returned from DescribeStream operation
+ * @return list of {@link DynamoDbStreamsShardSplit} which will be put to
tracked splits
+ */
+ private List<DynamoDbStreamsShardSplit>
determineNewShardsToBePutForAssignment(
+ Collection<Shard> shards) {
+ Map<String, Shard> shardIdToShardMap = getShardIdToShardMap(shards);
+ List<Shard> openShards = getOpenShards(shards);
+ List<DynamoDbStreamsShardSplit> newSplitsToBeTracked = new
ArrayList<>();
+ Map<String, Boolean> memoizationContext = new HashMap<>();
+ for (Shard openShard : openShards) {
+ String shardId = openShard.shardId();
+ if (!knownSplits.containsKey(shardId)) {
+ boolean isDescendant =
+ checkIfShardIsDescendantAndAddAncestorsToBeTracked(
+ openShard.shardId(),
+ shardIdToShardMap,
+ newSplitsToBeTracked,
+ memoizationContext);
+ if (isDescendant) {
+ newSplitsToBeTracked.add(mapToSplit(openShard,
TRIM_HORIZON));
+ } else {
+ newSplitsToBeTracked.add(mapToSplit(openShard,
initialPosition));
+ }
+ }
+ }
+ return newSplitsToBeTracked;
+ }
+
+ /**
+ * Check if any ancestor shard of the current shard has not been tracked
yet. Take this example:
+ * 0->3->8, 0->4->9, 1->5, 1->6, 2->7
+ *
+ * <p>At epoch 1, the lineage looked like this due to describestream
inconsistency 0->3, 1->5,
+ * 1->6, 2->7 knownSplits = 0,1,2,3,5,6,7 After a few describestream
calls, at epoch 2, after
+ * the whole lineage got discovered, since 4 was not tracked, we should
start tracking 4 also.
+ * knownSplits = 0,1,2,3,4,5,6,7,8,9.
+ */
+ private boolean checkIfShardIsDescendantAndAddAncestorsToBeTracked(
+ String shardId,
+ Map<String, Shard> shardIdToShardMap,
+ List<DynamoDbStreamsShardSplit> newSplitsToBeTracked,
+ Map<String, Boolean> memoizationContext) {
+ Boolean previousValue = memoizationContext.get(shardId);
+ if (previousValue != null) {
+ return previousValue;
+ }
+
+ if (shardId != null && shardIdToShardMap.containsKey(shardId)) {
+ if (knownSplits.containsKey(shardId)) {
+ return true;
+ } else {
+ Shard shard = shardIdToShardMap.get(shardId);
+ String parentShardId = shard.parentShardId();
+ boolean isParentShardDescendant =
+ checkIfShardIsDescendantAndAddAncestorsToBeTracked(
+ parentShardId,
+ shardIdToShardMap,
+ newSplitsToBeTracked,
+ memoizationContext);
+ if (shardIdToShardMap.containsKey(parentShardId)) {
+ if (!knownSplits.containsKey(parentShardId)) {
+ Shard parentShard =
shardIdToShardMap.get(parentShardId);
+ if (isParentShardDescendant) {
+ newSplitsToBeTracked.add(mapToSplit(parentShard,
TRIM_HORIZON));
+ } else {
+ newSplitsToBeTracked.add(mapToSplit(parentShard,
initialPosition));
+ }
+ return true;
+ }
+ }
+ }
+ }
+
+ return false;
+ }
+
+ private DynamoDbStreamsShardSplit mapToSplit(
+ Shard shard, DynamodbStreamsSourceConfigConstants.InitialPosition
initialPosition) {
+ StartingPosition startingPosition;
+ switch (initialPosition) {
+ case LATEST:
+ startingPosition = StartingPosition.latest();
+ break;
+ case TRIM_HORIZON:
+ default:
+ startingPosition = StartingPosition.fromStart();
+ }
+
+ Set<String> parentShardIds = new HashSet<>();
+ if (shard.parentShardId() != null) {
+ parentShardIds.add(shard.parentShardId());
+ }
+ return new DynamoDbStreamsShardSplit(
+ streamArn, shard.shardId(), startingPosition, parentShardIds);
+ }
+
+ /**
+ * Mark splits as assigned. Assigned splits will no longer be returned as
pending splits.
+ *
+ * @param splitsToAssign collection of splits to mark as assigned
+ */
+ public void markAsAssigned(Collection<DynamoDbStreamsShardSplit>
splitsToAssign) {
+ splitsToAssign.forEach(split -> assignedSplits.add(split.splitId()));
+ }
+
+ /**
+ * Mark splits as finished. Assigned splits will no longer be returned as
pending splits.
+ *
+ * @param splitsToFinish collection of splits to mark as assigned
+ */
+ public void markAsFinished(Collection<String> splitsToFinish) {
+ splitsToFinish.forEach(
+ splitId -> {
+ finishedSplits.add(splitId);
+ assignedSplits.remove(splitId);
+ });
+ }
+
+ public boolean isAssigned(String splitId) {
+ return assignedSplits.contains(splitId);
+ }
+
+ /**
+ * Since we never put an inconsistent shard lineage to splitTracker, so if
a shard's parent is
+ * not there, that means that that should already be cleaned up.
+ */
+ public List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment() {
+ return knownSplits.values().stream()
+ .filter(
+ split -> {
+ boolean splitIsNotAssigned =
!isAssigned(split.splitId());
+ return splitIsNotAssigned
+ && !isFinished(split.splitId())
+ && (verifyAllParentSplitsAreFinished(split)
+ ||
verifyAllParentSplitsAreCleanedUp(split));
Review Comment:
Unresolving as this has not been pushed down! Given we have only 1 parent -
we can also simplify this logic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]