ferenc-csaky commented on code in PR #219:
URL: 
https://github.com/apache/flink-connector-aws/pull/219#discussion_r2594619912


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java:
##########
@@ -76,6 +76,9 @@ public enum InitialPosition {
     public static final String BASE_DDB_STREAMS_USER_AGENT_PREFIX_FORMAT =
             "Apache Flink %s (%s) DynamoDb Streams Connector";
 
+    public static final String 
DYNAMODB_STREAMS_THROTTLING_EXCEPTION_ERROR_CODE =

Review Comment:
   unused, delete



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##########
@@ -176,7 +176,7 @@ private <AttributeT> TableSchema<AttributeT> 
createTableSchemaFromPojo(
                     tableSchemaBuilder,
                     propertyDescriptor.getName(),
                     BeanAttributeGetter.create(
-                            typeInfo.getTypeClass(), 
propertyDescriptor.getReadMethod()),
+                            typeInfo.getTypeClass(), 
propertyDescriptor.getReadMethod(), null),

Review Comment:
   Any reason not adding `MethodHandles.lookup()` if it was introduced in the 
SDK?



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java:
##########
@@ -50,12 +56,31 @@ public DynamoDbStreamsShardSplit(
         this(streamArn, shardId, startingPosition, parentShardId, false);
     }
 
+    public DynamoDbStreamsShardSplit(
+            String streamArn,
+            String shardId,
+            StartingPosition startingPosition,
+            String parentShardId,
+            List<Shard> childSplits) {
+        this(streamArn, shardId, startingPosition, parentShardId, false, 
childSplits);
+    }
+
     public DynamoDbStreamsShardSplit(
             String streamArn,
             String shardId,
             StartingPosition startingPosition,
             String parentShardId,
             boolean isFinished) {
+        this(streamArn, shardId, startingPosition, parentShardId, isFinished, 
new ArrayList<>());
+    }
+
+    public DynamoDbStreamsShardSplit(
+            String streamArn,
+            String shardId,
+            StartingPosition startingPosition,
+            String parentShardId,
+            boolean isFinished,
+            List<Shard> childSplits) {

Review Comment:
   I'd invert the order of the `boolean` and `List` params, same on the class 
field level.



##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java:
##########
@@ -304,21 +305,29 @@ void 
testLatestAssignsChildShardsWithTrimHorizonDuringPeriodicDiscovery() throws
                     };
             streamProxy.addShards(childShards);
             enumerator.handleSourceEvent(
-                    subtaskId, new 
SplitsFinishedEvent(Collections.singleton(shards[2].shardId())));
-            // Given no resharding occurs (list of shards remains the same)
-            // When first periodic discovery runs
-            context.runPeriodicCallable(0);
-            // Then no additional splits are assigned
-            SplitsAssignment<DynamoDbStreamsShardSplit> 
periodicDiscoverySplitAssignment =
-                    context.getSplitsAssignmentSequence().get(2);
+                    subtaskId,
+                    new SplitsFinishedEvent(
+                            Collections.singleton(

Review Comment:
   `Collections.singleton` -> `Set.of`



##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java:
##########
@@ -765,7 +774,12 @@ void testHandleSplitFinishedEventTest() throws Throwable {
             context.runNextOneTimeCallable();
 
             enumerator.handleSourceEvent(
-                    1, new 
SplitsFinishedEvent(Collections.singleton(completedShard.shardId())));
+                    1,
+                    new SplitsFinishedEvent(
+                            Collections.singleton(
+                                    new SplitsFinishedEventContext(
+                                            completedShard.shardId(),
+                                            
Collections.singletonList(shards[1])))));

Review Comment:
   `Collections.singletonList` -> `List.of`



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java:
##########
@@ -91,6 +94,10 @@ public enum InitialPosition {
                             .withDescription(
                                     "The default idle time between non-empty 
polls for DynamoDB Streams GetRecords API");
 
+    public static final int MAX_RETRY_ATTEMPTS_FOR_CHILD_SHARDS = 5;
+    public static final Duration CHILD_SHARD_DISCOVERY_MIN_DELAY = 
Duration.ofMillis(100);
+    public static final Duration CHILD_SHARD_DISCOVERY_MAX_DELAY = 
Duration.ofMillis(1000);

Review Comment:
   You convert both of these to long, so I recommend adding an `_MS` suffix to 
the name and define them `long` in the first place, so we spare the unnecessary 
conversion.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java:
##########
@@ -88,6 +89,24 @@ public ListShardsResult listShards(String streamArn, 
@Nullable String lastSeenSh
         return listShardsResult;
     }
 
+    @Override
+    public ListShardsResult listShardsWithFilter(String streamArn, ShardFilter 
shardFilter) {
+        LOG.info("Child shards with filter called, for shardId: {}", 
shardFilter.shardId());
+        ListShardsResult listShardsResult = new ListShardsResult();
+
+        try {
+            DescribeStreamResponse describeStreamResponse =
+                    this.describeStream(streamArn, shardFilter);
+            
listShardsResult.addShards(describeStreamResponse.streamDescription().shards());
+            listShardsResult.setStreamStatus(
+                    describeStreamResponse.streamDescription().streamStatus());
+        } catch (Exception e) {
+            LOG.error("DescribeStream with Filter API threw an exception", e);

Review Comment:
   We log this as an error, but the show goes on. If this is not an 
unrecoverable problem, this should be a WARN.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -138,7 +139,20 @@ public void handleSourceEvent(int subtaskId, SourceEvent 
sourceEvent) {
 
     /** When we mark a split as finished, we will only assign its child splits 
to the subtasks. */
     private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
-        splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+        Set<String> finishedSplitIds =
+                splitsFinishedEvent.getFinishedSplits().stream()
+                        .map(SplitsFinishedEventContext::getSplitId)
+                        .collect(Collectors.toSet());
+        splitTracker.markAsFinished(finishedSplitIds);
+        List<Shard> childrenOfFinishedSplits = new ArrayList<>();
+        splitsFinishedEvent
+                .getFinishedSplits()
+                .forEach(
+                        finishedSplitEvent ->
+                                childrenOfFinishedSplits.addAll(
+                                        finishedSplitEvent.getChildSplits()));
+        LOG.info("Adding Children of finishedSplits to splitTracker: {}", 
childrenOfFinishedSplits);

Review Comment:
   I'd consider logging the every `Shard` as a string completely too chatty on 
an INFO level, maybe i'd introduce a debug level check and if debuglevel is 
active then DEBUG log the current message, otherwise log the size on INFO level.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java:
##########
@@ -88,6 +89,24 @@ public ListShardsResult listShards(String streamArn, 
@Nullable String lastSeenSh
         return listShardsResult;
     }
 
+    @Override
+    public ListShardsResult listShardsWithFilter(String streamArn, ShardFilter 
shardFilter) {
+        LOG.info("Child shards with filter called, for shardId: {}", 
shardFilter.shardId());

Review Comment:
   I'd say DEBUG level, and for the other INFO one in this method too.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEventContext.java:
##########
@@ -0,0 +1,28 @@
+package org.apache.flink.connector.dynamodb.source.enumerator.event;
+
+import org.apache.flink.annotation.Internal;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** Context which contains the split id and the finished splits for a finished 
split event. */
+@Internal
+public class SplitsFinishedEventContext implements Serializable {
+    String splitId;
+    List<Shard> childSplits;

Review Comment:
   Why not `private final` for both?



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java:
##########
@@ -50,12 +56,31 @@ public DynamoDbStreamsShardSplit(
         this(streamArn, shardId, startingPosition, parentShardId, false);
     }
 
+    public DynamoDbStreamsShardSplit(
+            String streamArn,
+            String shardId,
+            StartingPosition startingPosition,
+            String parentShardId,
+            List<Shard> childSplits) {
+        this(streamArn, shardId, startingPosition, parentShardId, false, 
childSplits);
+    }
+
     public DynamoDbStreamsShardSplit(
             String streamArn,
             String shardId,
             StartingPosition startingPosition,
             String parentShardId,
             boolean isFinished) {
+        this(streamArn, shardId, startingPosition, parentShardId, isFinished, 
new ArrayList<>());

Review Comment:
   `List.of()` instead of `new ArrayList<>()`.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java:
##########
@@ -41,6 +42,8 @@ public interface StreamProxy extends Closeable {
      */
     ListShardsResult listShards(String streamArn, @Nullable String 
lastSeenShardId);
 
+    ListShardsResult listShardsWithFilter(String streamArn, ShardFilter 
shardFilter);

Review Comment:
   Pls. add some javadoc to the new method.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java:
##########
@@ -74,6 +78,18 @@ public byte[] serialize(DynamoDbStreamsShardSplit split) 
throws IOException {
                 out.writeUTF(split.getParentShardId());
             }
             out.writeBoolean(split.isFinished());
+            out.writeInt(split.getChildSplits().size());

Review Comment:
   To make this bw compatible shouldn't we write even the size only when the 
list is not empty?



##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java:
##########
@@ -765,7 +774,12 @@ void testHandleSplitFinishedEventTest() throws Throwable {
             context.runNextOneTimeCallable();
 
             enumerator.handleSourceEvent(
-                    1, new 
SplitsFinishedEvent(Collections.singleton(completedShard.shardId())));
+                    1,
+                    new SplitsFinishedEvent(
+                            Collections.singleton(

Review Comment:
   `Collections.singleton` -> `Set.of`



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEventContext.java:
##########
@@ -0,0 +1,28 @@
+package org.apache.flink.connector.dynamodb.source.enumerator.event;
+
+import org.apache.flink.annotation.Internal;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** Context which contains the split id and the finished splits for a finished 
split event. */
+@Internal
+public class SplitsFinishedEventContext implements Serializable {

Review Comment:
   This class is used as elements of a `Set`, but does not override 
`equals`+`hashCode`. This is an already existing bug, which I guess did not 
surface yet, cause we only create this set based on a `Map`, so actually there 
are no  real duplicates. But let's fix this.
   
   Let's add a `serialVersionUID` constant to it too.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java:
##########
@@ -29,10 +29,12 @@ public class DynamoDbStreamsShardSplitState {
     private final DynamoDbStreamsShardSplit dynamoDbStreamsShardSplit;
     private StartingPosition nextStartingPosition;
     private String nextShardIterator;
+    private boolean hasShardEndReached;

Review Comment:
   `hasShardEndReached` -> `shardEndReached`, and the getter/setter should 
follow.



##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java:
##########
@@ -38,19 +38,22 @@
 import software.amazon.awssdk.services.dynamodb.model.Record;
 import 
software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
 import software.amazon.awssdk.services.dynamodb.model.Shard;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
 import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
 import software.amazon.awssdk.services.dynamodb.model.StreamDescription;
 import software.amazon.awssdk.services.dynamodb.model.StreamStatus;
 import 
software.amazon.awssdk.services.dynamodb.model.TrimmedDataAccessException;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
 import static 
org.apache.flink.connector.dynamodb.source.util.TestUtil.generateShardId;
-import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 import static 
org.assertj.core.api.AssertionsForClassTypes.assertThatNoException;

Review Comment:
   Pls. change AssertionsForClassTypes to Assertions here too.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java:
##########
@@ -83,19 +89,40 @@ protected void onSplitFinished(Map<String, 
DynamoDbStreamsShardSplitState> finis
         splitFinishedEvents.computeIfAbsent(currentCheckpointId, k -> new 
HashSet<>());
         finishedSplitIds.values().stream()
                 .map(
-                        finishedSplit ->
-                                new DynamoDbStreamsShardSplit(
-                                        finishedSplit.getStreamArn(),
-                                        finishedSplit.getShardId(),
-                                        
finishedSplit.getNextStartingPosition(),
-                                        finishedSplit
-                                                .getDynamoDbStreamsShardSplit()
-                                                .getParentShardId(),
-                                        true))
+                        finishedSplit -> {
+                            List<Shard> childSplits = new ArrayList<>();
+                            String finishedSplitId = 
finishedSplit.getSplitId();
+                            if (childShardIdMap.containsKey(finishedSplitId)) {
+                                List<Shard> childSplitIdsOfFinishedSplit =
+                                        childShardIdMap.get(finishedSplitId);
+                                
childSplits.addAll(childSplitIdsOfFinishedSplit);
+                            }
+                            return new DynamoDbStreamsShardSplit(
+                                    finishedSplit.getStreamArn(),
+                                    finishedSplit.getShardId(),
+                                    finishedSplit.getNextStartingPosition(),
+                                    
finishedSplit.getDynamoDbStreamsShardSplit().getParentShardId(),
+                                    true,
+                                    childSplits);
+                        })
                 .forEach(split -> 
splitFinishedEvents.get(currentCheckpointId).add(split));
 
+        Set<SplitsFinishedEventContext> splitsFinishedEventContextMap =
+                finishedSplitIds.values().stream()
+                        .map(
+                                finishedSplit -> {
+                                    List<Shard> childSplits = new 
ArrayList<>();
+                                    String finishedSplitId = 
finishedSplit.getSplitId();
+                                    if 
(childShardIdMap.containsKey(finishedSplitId)) {
+                                        
childSplits.addAll(childShardIdMap.remove(finishedSplitId));
+                                    }
+                                    return new SplitsFinishedEventContext(
+                                            finishedSplitId, childSplits);
+                                })
+                        .collect(Collectors.toSet());

Review Comment:
   I believe this can be simplified a bit:
   ```java
   Set<SplitsFinishedEventContext> splitsFinishedEventContextMap =
           finishedSplitIds.values().stream()
                   .map(DynamoDbStreamsShardSplitState::getSplitId)
                   .filter(childShardIdMap::containsKey)
                   .map(
                           finishedSplitId ->
                                   new SplitsFinishedEventContext(
                                           finishedSplitId,
                                           List.copyOf(
                                                   
childShardIdMap.remove(finishedSplitId))))
                   .collect(Collectors.toSet());
   ```
   
   Pretty much the same applies for the code at L92-106.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java:
##########
@@ -106,6 +118,49 @@ public RecordsWithSplitIds<Record> fetch() throws 
IOException {
         }
 
         long currentTime = System.currentTimeMillis();
+
+        if (splitContext.splitState.isHasShardEndReached()) {

Review Comment:
   I'd separate the body of this `if` into a new private method, e.g:
   ```java
   private RecordsWithSplitIds<Record> handleShardEnd(
           DynamoDbStreamsShardSplitWithContext splitContext, long currentTime) 
{ ... }
   ```
   
   And inside, I'd invert the `MAX_RETRY_ATTEMPTS_FOR_CHILD_SHARDS` check 
(return early, and no `else` branch required in that case), so we can minimize 
the endless nested `if`s.



##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java:
##########
@@ -88,6 +91,86 @@ void testListShards(String lastSeenShardId) {
                 .isEqualTo(expectedListShardsResult);
     }
 
+    @Test
+    void testListShardsWithFilterForChildShards() {
+        final String streamArn =
+                
"arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-01-01T00:00:00.826";
+        final String parentShardId = "shardId-000000000001";
+
+        // Create child shards that we expect to be returned
+        final List<Shard> childShards =
+                Arrays.asList(

Review Comment:
   `Arrays.asList` -> `List.of`, everywhere in the newly added code.



##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java:
##########
@@ -84,6 +86,25 @@ public ListShardsResult listShards(String streamArn, 
@Nullable String lastSeenSh
             return listShardsResult;
         }
 
+        @Override
+        public ListShardsResult listShardsWithFilter(String streamArn, 
ShardFilter shardFilter) {
+            if (ShardFilterType.CHILD_SHARDS.equals(shardFilter.type())) {

Review Comment:
   Let's invert this check and throw the exception inside, so we can spare an 
indentation level for the more complex code.



##########
flink-connector-aws/flink-connector-dynamodb/pom.xml:
##########
@@ -71,10 +71,12 @@ under the License.
         <dependency>
             <groupId>software.amazon.awssdk</groupId>
             <artifactId>dynamodb</artifactId>
+            <version>2.32.0</version>
         </dependency>
         <dependency>
             <groupId>software.amazon.awssdk</groupId>
             <artifactId>dynamodb-enhanced</artifactId>
+            <version>2.32.0</version>

Review Comment:
   I'd bumpt the AWS SDK version to the current lates. (If that's what you mean 
by updating this in the root POM, then nvm.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to