ferenc-csaky commented on code in PR #219:
URL:
https://github.com/apache/flink-connector-aws/pull/219#discussion_r2594619912
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java:
##########
@@ -76,6 +76,9 @@ public enum InitialPosition {
public static final String BASE_DDB_STREAMS_USER_AGENT_PREFIX_FORMAT =
"Apache Flink %s (%s) DynamoDb Streams Connector";
+ public static final String
DYNAMODB_STREAMS_THROTTLING_EXCEPTION_ERROR_CODE =
Review Comment:
unused, delete
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##########
@@ -176,7 +176,7 @@ private <AttributeT> TableSchema<AttributeT>
createTableSchemaFromPojo(
tableSchemaBuilder,
propertyDescriptor.getName(),
BeanAttributeGetter.create(
- typeInfo.getTypeClass(),
propertyDescriptor.getReadMethod()),
+ typeInfo.getTypeClass(),
propertyDescriptor.getReadMethod(), null),
Review Comment:
Any reason not adding `MethodHandles.lookup()` if it was introduced in the
SDK?
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java:
##########
@@ -50,12 +56,31 @@ public DynamoDbStreamsShardSplit(
this(streamArn, shardId, startingPosition, parentShardId, false);
}
+ public DynamoDbStreamsShardSplit(
+ String streamArn,
+ String shardId,
+ StartingPosition startingPosition,
+ String parentShardId,
+ List<Shard> childSplits) {
+ this(streamArn, shardId, startingPosition, parentShardId, false,
childSplits);
+ }
+
public DynamoDbStreamsShardSplit(
String streamArn,
String shardId,
StartingPosition startingPosition,
String parentShardId,
boolean isFinished) {
+ this(streamArn, shardId, startingPosition, parentShardId, isFinished,
new ArrayList<>());
+ }
+
+ public DynamoDbStreamsShardSplit(
+ String streamArn,
+ String shardId,
+ StartingPosition startingPosition,
+ String parentShardId,
+ boolean isFinished,
+ List<Shard> childSplits) {
Review Comment:
I'd invert the order of the `boolean` and `List` params, same on the class
field level.
##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java:
##########
@@ -304,21 +305,29 @@ void
testLatestAssignsChildShardsWithTrimHorizonDuringPeriodicDiscovery() throws
};
streamProxy.addShards(childShards);
enumerator.handleSourceEvent(
- subtaskId, new
SplitsFinishedEvent(Collections.singleton(shards[2].shardId())));
- // Given no resharding occurs (list of shards remains the same)
- // When first periodic discovery runs
- context.runPeriodicCallable(0);
- // Then no additional splits are assigned
- SplitsAssignment<DynamoDbStreamsShardSplit>
periodicDiscoverySplitAssignment =
- context.getSplitsAssignmentSequence().get(2);
+ subtaskId,
+ new SplitsFinishedEvent(
+ Collections.singleton(
Review Comment:
`Collections.singleton` -> `Set.of`
##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java:
##########
@@ -765,7 +774,12 @@ void testHandleSplitFinishedEventTest() throws Throwable {
context.runNextOneTimeCallable();
enumerator.handleSourceEvent(
- 1, new
SplitsFinishedEvent(Collections.singleton(completedShard.shardId())));
+ 1,
+ new SplitsFinishedEvent(
+ Collections.singleton(
+ new SplitsFinishedEventContext(
+ completedShard.shardId(),
+
Collections.singletonList(shards[1])))));
Review Comment:
`Collections.singletonList` -> `List.of`
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java:
##########
@@ -91,6 +94,10 @@ public enum InitialPosition {
.withDescription(
"The default idle time between non-empty
polls for DynamoDB Streams GetRecords API");
+ public static final int MAX_RETRY_ATTEMPTS_FOR_CHILD_SHARDS = 5;
+ public static final Duration CHILD_SHARD_DISCOVERY_MIN_DELAY =
Duration.ofMillis(100);
+ public static final Duration CHILD_SHARD_DISCOVERY_MAX_DELAY =
Duration.ofMillis(1000);
Review Comment:
You convert both of these to long, so I recommend adding an `_MS` suffix to
the name and define them `long` in the first place, so we spare the unnecessary
conversion.
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java:
##########
@@ -88,6 +89,24 @@ public ListShardsResult listShards(String streamArn,
@Nullable String lastSeenSh
return listShardsResult;
}
+ @Override
+ public ListShardsResult listShardsWithFilter(String streamArn, ShardFilter
shardFilter) {
+ LOG.info("Child shards with filter called, for shardId: {}",
shardFilter.shardId());
+ ListShardsResult listShardsResult = new ListShardsResult();
+
+ try {
+ DescribeStreamResponse describeStreamResponse =
+ this.describeStream(streamArn, shardFilter);
+
listShardsResult.addShards(describeStreamResponse.streamDescription().shards());
+ listShardsResult.setStreamStatus(
+ describeStreamResponse.streamDescription().streamStatus());
+ } catch (Exception e) {
+ LOG.error("DescribeStream with Filter API threw an exception", e);
Review Comment:
We log this as an error, but the show goes on. If this is not an
unrecoverable problem, this should be a WARN.
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -138,7 +139,20 @@ public void handleSourceEvent(int subtaskId, SourceEvent
sourceEvent) {
/** When we mark a split as finished, we will only assign its child splits
to the subtasks. */
private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent
splitsFinishedEvent) {
- splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+ Set<String> finishedSplitIds =
+ splitsFinishedEvent.getFinishedSplits().stream()
+ .map(SplitsFinishedEventContext::getSplitId)
+ .collect(Collectors.toSet());
+ splitTracker.markAsFinished(finishedSplitIds);
+ List<Shard> childrenOfFinishedSplits = new ArrayList<>();
+ splitsFinishedEvent
+ .getFinishedSplits()
+ .forEach(
+ finishedSplitEvent ->
+ childrenOfFinishedSplits.addAll(
+ finishedSplitEvent.getChildSplits()));
+ LOG.info("Adding Children of finishedSplits to splitTracker: {}",
childrenOfFinishedSplits);
Review Comment:
I'd consider logging the every `Shard` as a string completely too chatty on
an INFO level, maybe i'd introduce a debug level check and if debuglevel is
active then DEBUG log the current message, otherwise log the size on INFO level.
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java:
##########
@@ -88,6 +89,24 @@ public ListShardsResult listShards(String streamArn,
@Nullable String lastSeenSh
return listShardsResult;
}
+ @Override
+ public ListShardsResult listShardsWithFilter(String streamArn, ShardFilter
shardFilter) {
+ LOG.info("Child shards with filter called, for shardId: {}",
shardFilter.shardId());
Review Comment:
I'd say DEBUG level, and for the other INFO one in this method too.
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEventContext.java:
##########
@@ -0,0 +1,28 @@
+package org.apache.flink.connector.dynamodb.source.enumerator.event;
+
+import org.apache.flink.annotation.Internal;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** Context which contains the split id and the finished splits for a finished
split event. */
+@Internal
+public class SplitsFinishedEventContext implements Serializable {
+ String splitId;
+ List<Shard> childSplits;
Review Comment:
Why not `private final` for both?
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java:
##########
@@ -50,12 +56,31 @@ public DynamoDbStreamsShardSplit(
this(streamArn, shardId, startingPosition, parentShardId, false);
}
+ public DynamoDbStreamsShardSplit(
+ String streamArn,
+ String shardId,
+ StartingPosition startingPosition,
+ String parentShardId,
+ List<Shard> childSplits) {
+ this(streamArn, shardId, startingPosition, parentShardId, false,
childSplits);
+ }
+
public DynamoDbStreamsShardSplit(
String streamArn,
String shardId,
StartingPosition startingPosition,
String parentShardId,
boolean isFinished) {
+ this(streamArn, shardId, startingPosition, parentShardId, isFinished,
new ArrayList<>());
Review Comment:
`List.of()` instead of `new ArrayList<>()`.
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java:
##########
@@ -41,6 +42,8 @@ public interface StreamProxy extends Closeable {
*/
ListShardsResult listShards(String streamArn, @Nullable String
lastSeenShardId);
+ ListShardsResult listShardsWithFilter(String streamArn, ShardFilter
shardFilter);
Review Comment:
Pls. add some javadoc to the new method.
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java:
##########
@@ -74,6 +78,18 @@ public byte[] serialize(DynamoDbStreamsShardSplit split)
throws IOException {
out.writeUTF(split.getParentShardId());
}
out.writeBoolean(split.isFinished());
+ out.writeInt(split.getChildSplits().size());
Review Comment:
To make this bw compatible shouldn't we write even the size only when the
list is not empty?
##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java:
##########
@@ -765,7 +774,12 @@ void testHandleSplitFinishedEventTest() throws Throwable {
context.runNextOneTimeCallable();
enumerator.handleSourceEvent(
- 1, new
SplitsFinishedEvent(Collections.singleton(completedShard.shardId())));
+ 1,
+ new SplitsFinishedEvent(
+ Collections.singleton(
Review Comment:
`Collections.singleton` -> `Set.of`
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEventContext.java:
##########
@@ -0,0 +1,28 @@
+package org.apache.flink.connector.dynamodb.source.enumerator.event;
+
+import org.apache.flink.annotation.Internal;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** Context which contains the split id and the finished splits for a finished
split event. */
+@Internal
+public class SplitsFinishedEventContext implements Serializable {
Review Comment:
This class is used as elements of a `Set`, but does not override
`equals`+`hashCode`. This is an already existing bug, which I guess did not
surface yet, cause we only create this set based on a `Map`, so actually there
are no real duplicates. But let's fix this.
Let's add a `serialVersionUID` constant to it too.
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java:
##########
@@ -29,10 +29,12 @@ public class DynamoDbStreamsShardSplitState {
private final DynamoDbStreamsShardSplit dynamoDbStreamsShardSplit;
private StartingPosition nextStartingPosition;
private String nextShardIterator;
+ private boolean hasShardEndReached;
Review Comment:
`hasShardEndReached` -> `shardEndReached`, and the getter/setter should
follow.
##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java:
##########
@@ -38,19 +38,22 @@
import software.amazon.awssdk.services.dynamodb.model.Record;
import
software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.Shard;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
import software.amazon.awssdk.services.dynamodb.model.StreamDescription;
import software.amazon.awssdk.services.dynamodb.model.StreamStatus;
import
software.amazon.awssdk.services.dynamodb.model.TrimmedDataAccessException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import static
org.apache.flink.connector.dynamodb.source.util.TestUtil.generateShardId;
-import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
import static
org.assertj.core.api.AssertionsForClassTypes.assertThatNoException;
Review Comment:
Pls. change AssertionsForClassTypes to Assertions here too.
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java:
##########
@@ -83,19 +89,40 @@ protected void onSplitFinished(Map<String,
DynamoDbStreamsShardSplitState> finis
splitFinishedEvents.computeIfAbsent(currentCheckpointId, k -> new
HashSet<>());
finishedSplitIds.values().stream()
.map(
- finishedSplit ->
- new DynamoDbStreamsShardSplit(
- finishedSplit.getStreamArn(),
- finishedSplit.getShardId(),
-
finishedSplit.getNextStartingPosition(),
- finishedSplit
- .getDynamoDbStreamsShardSplit()
- .getParentShardId(),
- true))
+ finishedSplit -> {
+ List<Shard> childSplits = new ArrayList<>();
+ String finishedSplitId =
finishedSplit.getSplitId();
+ if (childShardIdMap.containsKey(finishedSplitId)) {
+ List<Shard> childSplitIdsOfFinishedSplit =
+ childShardIdMap.get(finishedSplitId);
+
childSplits.addAll(childSplitIdsOfFinishedSplit);
+ }
+ return new DynamoDbStreamsShardSplit(
+ finishedSplit.getStreamArn(),
+ finishedSplit.getShardId(),
+ finishedSplit.getNextStartingPosition(),
+
finishedSplit.getDynamoDbStreamsShardSplit().getParentShardId(),
+ true,
+ childSplits);
+ })
.forEach(split ->
splitFinishedEvents.get(currentCheckpointId).add(split));
+ Set<SplitsFinishedEventContext> splitsFinishedEventContextMap =
+ finishedSplitIds.values().stream()
+ .map(
+ finishedSplit -> {
+ List<Shard> childSplits = new
ArrayList<>();
+ String finishedSplitId =
finishedSplit.getSplitId();
+ if
(childShardIdMap.containsKey(finishedSplitId)) {
+
childSplits.addAll(childShardIdMap.remove(finishedSplitId));
+ }
+ return new SplitsFinishedEventContext(
+ finishedSplitId, childSplits);
+ })
+ .collect(Collectors.toSet());
Review Comment:
I believe this can be simplified a bit:
```java
Set<SplitsFinishedEventContext> splitsFinishedEventContextMap =
finishedSplitIds.values().stream()
.map(DynamoDbStreamsShardSplitState::getSplitId)
.filter(childShardIdMap::containsKey)
.map(
finishedSplitId ->
new SplitsFinishedEventContext(
finishedSplitId,
List.copyOf(
childShardIdMap.remove(finishedSplitId))))
.collect(Collectors.toSet());
```
Pretty much the same applies for the code at L92-106.
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java:
##########
@@ -106,6 +118,49 @@ public RecordsWithSplitIds<Record> fetch() throws
IOException {
}
long currentTime = System.currentTimeMillis();
+
+ if (splitContext.splitState.isHasShardEndReached()) {
Review Comment:
I'd separate the body of this `if` into a new private method, e.g:
```java
private RecordsWithSplitIds<Record> handleShardEnd(
DynamoDbStreamsShardSplitWithContext splitContext, long currentTime)
{ ... }
```
And inside, I'd invert the `MAX_RETRY_ATTEMPTS_FOR_CHILD_SHARDS` check
(return early, and no `else` branch required in that case), so we can minimize
the endless nested `if`s.
##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java:
##########
@@ -88,6 +91,86 @@ void testListShards(String lastSeenShardId) {
.isEqualTo(expectedListShardsResult);
}
+ @Test
+ void testListShardsWithFilterForChildShards() {
+ final String streamArn =
+
"arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-01-01T00:00:00.826";
+ final String parentShardId = "shardId-000000000001";
+
+ // Create child shards that we expect to be returned
+ final List<Shard> childShards =
+ Arrays.asList(
Review Comment:
`Arrays.asList` -> `List.of`, everywhere in the newly added code.
##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java:
##########
@@ -84,6 +86,25 @@ public ListShardsResult listShards(String streamArn,
@Nullable String lastSeenSh
return listShardsResult;
}
+ @Override
+ public ListShardsResult listShardsWithFilter(String streamArn,
ShardFilter shardFilter) {
+ if (ShardFilterType.CHILD_SHARDS.equals(shardFilter.type())) {
Review Comment:
Let's invert this check and throw the exception inside, so we can spare an
indentation level for the more complex code.
##########
flink-connector-aws/flink-connector-dynamodb/pom.xml:
##########
@@ -71,10 +71,12 @@ under the License.
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
+ <version>2.32.0</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb-enhanced</artifactId>
+ <version>2.32.0</version>
Review Comment:
I'd bumpt the AWS SDK version to the current lates. (If that's what you mean
by updating this in the root POM, then nvm.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]