This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch v6.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git

commit e622064f4a1a33e996581e9bebfc44133eb74bc8
Author: Abhi Gupta <[email protected]>
AuthorDate: Thu Dec 11 11:54:30 2025 +0530

    [FLINK-36296] Add support for incremental shard discovery for DynamoDB 
Streams Source
    
    (cherry picked from commit 68acdbd69b9ba5beb00e26abf277b6ec5c660e92)
---
 .../connector/aws/util/AWSClientUtilTest.java      |  6 +-
 .../sink/DynamoDbTypeInformedElementConverter.java |  5 +-
 .../dynamodb/source/DynamoDbStreamsSource.java     | 45 ++++++-----
 .../DynamodbStreamsSourceConfigConstants.java      |  4 +
 .../DynamoDbStreamsSourceEnumerator.java           | 26 ++++--
 .../enumerator/event/SplitsFinishedEvent.java      | 15 ++--
 .../event/SplitsFinishedEventContext.java          | 52 ++++++++++++
 .../source/enumerator/tracker/SplitTracker.java    |  4 +
 .../source/proxy/DynamoDbStreamsProxy.java         | 43 ++++++++++
 .../dynamodb/source/proxy/StreamProxy.java         | 10 +++
 .../source/reader/DynamoDbStreamsSourceReader.java | 50 +++++++++---
 .../PollingDynamoDbStreamsShardSplitReader.java    | 92 +++++++++++++++++++---
 .../source/split/DynamoDbStreamsShardSplit.java    | 40 +++++++++-
 .../split/DynamoDbStreamsShardSplitSerializer.java | 49 +++++++++++-
 .../split/DynamoDbStreamsShardSplitState.java      | 10 +++
 .../DynamoDbStreamsSourceEnumeratorTest.java       | 36 ++++++---
 .../source/proxy/DynamoDbStreamsProxyTest.java     | 86 +++++++++++++++++++-
 .../reader/DynamoDbStreamsSourceReaderTest.java    | 16 +++-
 ...PollingDynamoDbStreamsShardSplitReaderTest.java | 16 +++-
 .../DynamoDbStreamsShardSplitSerializerTest.java   | 14 ++++
 .../source/util/DynamoDbStreamsClientProvider.java | 29 +++++++
 .../source/util/DynamoDbStreamsProxyProvider.java  | 23 ++++++
 .../connector/dynamodb/source/util/TestUtil.java   | 35 ++++++++
 pom.xml                                            |  2 +-
 24 files changed, 627 insertions(+), 81 deletions(-)

diff --git 
a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java
 
b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java
index 67331e3..16e6aa6 100644
--- 
a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java
+++ 
b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java
@@ -194,8 +194,10 @@ class AWSClientUtilTest {
 
         ClientOverrideConfiguration resultOverrideConfiguration =
                 s3Client.serviceClientConfiguration().overrideConfiguration();
-        assertThat(resultOverrideConfiguration.retryStrategy())
-                .isEqualTo(Optional.of(overrideRetryStrategy));
+        assertThat(resultOverrideConfiguration.retryStrategy()).isPresent();
+        RetryStrategy resultStrategy = 
resultOverrideConfiguration.retryStrategy().get();
+        assertThat(resultStrategy.maxAttempts()).isEqualTo(10);
+
         
assertThat(resultOverrideConfiguration.retryPolicy()).isEqualTo(Optional.empty());
         
assertThat(resultOverrideConfiguration.retryMode()).isEqualTo(Optional.empty());
         assertThat(resultOverrideConfiguration.retryStrategyConfigurator())
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java
index 4978f46..c41d30f 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java
@@ -50,6 +50,7 @@ import java.beans.BeanInfo;
 import java.beans.IntrospectionException;
 import java.beans.Introspector;
 import java.beans.PropertyDescriptor;
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -176,7 +177,9 @@ public class DynamoDbTypeInformedElementConverter<T>
                     tableSchemaBuilder,
                     propertyDescriptor.getName(),
                     BeanAttributeGetter.create(
-                            typeInfo.getTypeClass(), 
propertyDescriptor.getReadMethod()),
+                            typeInfo.getTypeClass(),
+                            propertyDescriptor.getReadMethod(),
+                            MethodHandles.lookup()),
                     fieldInfo);
         }
 
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java
index 5b20bc1..051d127 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java
@@ -55,10 +55,13 @@ import software.amazon.awssdk.http.SdkHttpClient;
 import software.amazon.awssdk.http.apache.ApacheHttpClient;
 import software.amazon.awssdk.retries.AdaptiveRetryStrategy;
 import software.amazon.awssdk.retries.api.BackoffStrategy;
+import software.amazon.awssdk.retries.api.RetryStrategy;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
 import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
 import software.amazon.awssdk.utils.AttributeMap;
 
 import java.time.Duration;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
@@ -145,14 +148,18 @@ public class DynamoDbStreamsSource<T>
         final Duration getRecordsIdlePollingTimeBetweenEmptyPolls =
                 
sourceConfig.get(DYNAMODB_STREAMS_GET_RECORDS_IDLE_TIME_BETWEEN_EMPTY_POLLS);
 
+        Map<String, List<Shard>> childShardMap = new ConcurrentHashMap<>();
         // We create a new stream proxy for each split reader since they have 
their own independent
         // lifecycle.
         Supplier<PollingDynamoDbStreamsShardSplitReader> splitReaderSupplier =
                 () ->
                         new PollingDynamoDbStreamsShardSplitReader(
-                                createDynamoDbStreamsProxy(sourceConfig),
+                                createDynamoDbStreamsProxy(
+                                        sourceConfig,
+                                        
SdkDefaultRetryStrategy.defaultRetryStrategy()),
                                 getRecordsIdlePollingTimeBetweenNonEmptyPolls,
                                 getRecordsIdlePollingTimeBetweenEmptyPolls,
+                                childShardMap,
                                 shardMetricGroupMap);
         DynamoDbStreamsRecordEmitter<T> recordEmitter =
                 new DynamoDbStreamsRecordEmitter<>(deserializationSchema);
@@ -162,6 +169,7 @@ public class DynamoDbStreamsSource<T>
                 recordEmitter,
                 sourceConfig,
                 readerContext,
+                childShardMap,
                 shardMetricGroupMap);
     }
 
@@ -178,11 +186,25 @@ public class DynamoDbStreamsSource<T>
                     SplitEnumeratorContext<DynamoDbStreamsShardSplit> 
enumContext,
                     DynamoDbStreamsSourceEnumeratorState checkpoint)
                     throws Exception {
+        int maxApiCallAttempts = 
sourceConfig.get(DYNAMODB_STREAMS_RETRY_COUNT);
+        Duration minDelayBetweenRetries =
+                
sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY);
+        Duration maxDelayBetweenRetries =
+                
sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MAX_DELAY);
+        BackoffStrategy backoffStrategy =
+                BackoffStrategy.exponentialDelay(minDelayBetweenRetries, 
maxDelayBetweenRetries);
+        AdaptiveRetryStrategy adaptiveRetryStrategy =
+                SdkDefaultRetryStrategy.adaptiveRetryStrategy()
+                        .toBuilder()
+                        .maxAttempts(maxApiCallAttempts)
+                        .backoffStrategy(backoffStrategy)
+                        .throttlingBackoffStrategy(backoffStrategy)
+                        .build();
         return new DynamoDbStreamsSourceEnumerator(
                 enumContext,
                 streamArn,
                 sourceConfig,
-                createDynamoDbStreamsProxy(sourceConfig),
+                createDynamoDbStreamsProxy(sourceConfig, 
adaptiveRetryStrategy),
                 dynamoDbStreamsShardAssigner,
                 checkpoint);
     }
@@ -199,7 +221,8 @@ public class DynamoDbStreamsSource<T>
                 new DynamoDbStreamsShardSplitSerializer());
     }
 
-    private DynamoDbStreamsProxy createDynamoDbStreamsProxy(Configuration 
consumerConfig) {
+    private DynamoDbStreamsProxy createDynamoDbStreamsProxy(
+            Configuration consumerConfig, RetryStrategy retryStrategy) {
         SdkHttpClient httpClient =
                 AWSGeneralUtil.createSyncHttpClient(
                         AttributeMap.builder().build(), 
ApacheHttpClient.builder());
@@ -215,26 +238,12 @@ public class DynamoDbStreamsSource<T>
         consumerConfig.addAllToProperties(dynamoDbStreamsClientProperties);
 
         AWSGeneralUtil.validateAwsCredentials(dynamoDbStreamsClientProperties);
-        int maxApiCallAttempts = 
sourceConfig.get(DYNAMODB_STREAMS_RETRY_COUNT);
-        Duration minDescribeStreamDelay =
-                
sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY);
-        Duration maxDescribeStreamDelay =
-                
sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MAX_DELAY);
-        BackoffStrategy backoffStrategy =
-                BackoffStrategy.exponentialDelay(minDescribeStreamDelay, 
maxDescribeStreamDelay);
-        AdaptiveRetryStrategy adaptiveRetryStrategy =
-                SdkDefaultRetryStrategy.adaptiveRetryStrategy()
-                        .toBuilder()
-                        .maxAttempts(maxApiCallAttempts)
-                        .backoffStrategy(backoffStrategy)
-                        .throttlingBackoffStrategy(backoffStrategy)
-                        .build();
         DynamoDbStreamsClient dynamoDbStreamsClient =
                 AWSClientUtil.createAwsSyncClient(
                         dynamoDbStreamsClientProperties,
                         httpClient,
                         DynamoDbStreamsClient.builder(),
-                        
ClientOverrideConfiguration.builder().retryStrategy(adaptiveRetryStrategy),
+                        
ClientOverrideConfiguration.builder().retryStrategy(retryStrategy),
                         DynamodbStreamsSourceConfigConstants
                                 .BASE_DDB_STREAMS_USER_AGENT_PREFIX_FORMAT,
                         
DynamodbStreamsSourceConfigConstants.DDB_STREAMS_CLIENT_USER_AGENT_PREFIX);
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java
index b16833e..8e21d09 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java
@@ -91,6 +91,10 @@ public class DynamodbStreamsSourceConfigConstants {
                             .withDescription(
                                     "The default idle time between non-empty 
polls for DynamoDB Streams GetRecords API");
 
+    public static final int MAX_RETRY_ATTEMPTS_FOR_CHILD_SHARDS = 5;
+    public static final long CHILD_SHARD_DISCOVERY_MIN_DELAY_MS = 100;
+    public static final long CHILD_SHARD_DISCOVERY_MAX_DELAY_MS = 1000;
+
     /** DynamoDb Streams identifier for user agent prefix. */
     public static final String DDB_STREAMS_CLIENT_USER_AGENT_PREFIX =
             "aws.dynamodbstreams.client.user-agent-prefix";
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
index 0c2f00b..c178611 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.connector.source.SplitsAssignment;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
 import 
org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEvent;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEventContext;
 import 
org.apache.flink.connector.dynamodb.source.enumerator.tracker.SplitGraphInconsistencyTracker;
 import 
org.apache.flink.connector.dynamodb.source.enumerator.tracker.SplitTracker;
 import 
org.apache.flink.connector.dynamodb.source.exception.DynamoDbStreamsSourceException;
@@ -138,7 +139,22 @@ public class DynamoDbStreamsSourceEnumerator
 
     /** When we mark a split as finished, we will only assign its child splits 
to the subtasks. */
     private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
-        splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+        Set<String> finishedSplitIds =
+                splitsFinishedEvent.getFinishedSplits().stream()
+                        .map(SplitsFinishedEventContext::getSplitId)
+                        .collect(Collectors.toSet());
+        splitTracker.markAsFinished(finishedSplitIds);
+        List<Shard> childrenOfFinishedSplits = new ArrayList<>();
+        splitsFinishedEvent
+                .getFinishedSplits()
+                .forEach(
+                        finishedSplitEvent ->
+                                childrenOfFinishedSplits.addAll(
+                                        finishedSplitEvent.getChildSplits()));
+        LOG.debug(
+                "Adding Children of finishedSplits to splitTracker: {}", 
childrenOfFinishedSplits);
+        LOG.info("Added {} child splits to splitTracker", 
childrenOfFinishedSplits.size());
+        splitTracker.addChildSplits(childrenOfFinishedSplits);
 
         Set<DynamoDbStreamsShardSplit> splitsAssignment = 
splitAssignment.get(subtaskId);
         // during recovery, splitAssignment may return null since there might 
be no split assigned
@@ -152,13 +168,12 @@ public class DynamoDbStreamsSourceEnumerator
                             + "Child shard discovery might be delayed until we 
have enough readers."
                             + "Finished split ids: {}",
                     subtaskId,
-                    splitsFinishedEvent.getFinishedSplitIds());
+                    finishedSplitIds);
             return;
         }
 
-        splitsAssignment.removeIf(
-                split -> 
splitsFinishedEvent.getFinishedSplitIds().contains(split.splitId()));
-        assignChildSplits(splitsFinishedEvent.getFinishedSplitIds());
+        splitsAssignment.removeIf(split -> 
finishedSplitIds.contains(split.splitId()));
+        assignChildSplits(finishedSplitIds);
     }
 
     private void processDiscoveredSplits(ListShardsResult discoveredSplits, 
Throwable throwable) {
@@ -230,6 +245,7 @@ public class DynamoDbStreamsSourceEnumerator
     private void assignChildSplits(Set<String> finishedSplitIds) {
         List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment =
                 splitTracker.getUnassignedChildSplits(finishedSplitIds);
+        LOG.info("Unassigned child splits: {}", splitsAvailableForAssignment);
         assignSplits(splitsAvailableForAssignment);
     }
 
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEvent.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEvent.java
index 0da5f01..22659c4 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEvent.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEvent.java
@@ -22,26 +22,27 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SourceEvent;
 
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /** Source event used by source reader to communicate that splits are finished 
to enumerator. */
 @Internal
 public class SplitsFinishedEvent implements SourceEvent {
     private static final long serialVersionUID = 1;
-    private final Set<String> finishedSplitIds;
+    private final Set<SplitsFinishedEventContext> finishedSplits;
 
-    public SplitsFinishedEvent(Set<String> finishedSplitIds) {
-        this.finishedSplitIds = finishedSplitIds;
+    public SplitsFinishedEvent(Set<SplitsFinishedEventContext> finishedSplits) 
{
+        this.finishedSplits = finishedSplits;
     }
 
-    public Set<String> getFinishedSplitIds() {
-        return finishedSplitIds;
+    public Set<SplitsFinishedEventContext> getFinishedSplits() {
+        return finishedSplits;
     }
 
     @Override
     public String toString() {
         return "SplitsFinishedEvent{"
-                + "finishedSplitIds=["
-                + String.join(",", finishedSplitIds)
+                + "finishedSplit=["
+                + 
finishedSplits.stream().map(Object::toString).collect(Collectors.joining(","))
                 + "]}";
     }
 }
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEventContext.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEventContext.java
new file mode 100644
index 0000000..67bf76c
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEventContext.java
@@ -0,0 +1,52 @@
+package org.apache.flink.connector.dynamodb.source.enumerator.event;
+
+import org.apache.flink.annotation.Internal;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+
+/** Context which contains the split id and the finished splits for a finished 
split event. */
+@Internal
+public class SplitsFinishedEventContext implements Serializable {
+    private static final long serialVersionUID = 2L;
+    private final String splitId;
+    private final List<Shard> childSplits;
+
+    public SplitsFinishedEventContext(String splitId, List<Shard> childSplits) 
{
+        this.splitId = splitId;
+        this.childSplits = childSplits;
+    }
+
+    public String getSplitId() {
+        return splitId;
+    }
+
+    public List<Shard> getChildSplits() {
+        return childSplits;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        SplitsFinishedEventContext that = (SplitsFinishedEventContext) o;
+
+        if (!splitId.equals(that.splitId)) {
+            return false;
+        }
+        return childSplits.equals(that.childSplits);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(splitId, childSplits);
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java
index e655d4c..042a216 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java
@@ -102,6 +102,10 @@ public class SplitTracker {
         addSplitsForLatest(shardsToAdd);
     }
 
+    public void addChildSplits(Collection<Shard> childShardsToAdd) {
+        addSplitsForTrimHorizon(childShardsToAdd);
+    }
+
     private void addSplitsForLatest(Collection<Shard> shardsToAdd) {
         List<Shard> openShards =
                 shardsToAdd.stream()
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java
index 537b1bf..133da8d 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java
@@ -32,6 +32,7 @@ import 
software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
 import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
 import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
 import 
software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
 import software.amazon.awssdk.services.dynamodb.model.StreamStatus;
 import 
software.amazon.awssdk.services.dynamodb.model.TrimmedDataAccessException;
 import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
@@ -88,6 +89,24 @@ public class DynamoDbStreamsProxy implements StreamProxy {
         return listShardsResult;
     }
 
+    @Override
+    public ListShardsResult listShardsWithFilter(String streamArn, ShardFilter 
shardFilter) {
+        LOG.debug("Child shards with filter called, for shardId: {}", 
shardFilter.shardId());
+        ListShardsResult listShardsResult = new ListShardsResult();
+
+        try {
+            DescribeStreamResponse describeStreamResponse =
+                    this.describeStream(streamArn, shardFilter);
+            
listShardsResult.addShards(describeStreamResponse.streamDescription().shards());
+            listShardsResult.setStreamStatus(
+                    describeStreamResponse.streamDescription().streamStatus());
+        } catch (Exception e) {
+            LOG.warn("DescribeStream with Filter API threw an exception", e);
+        }
+        LOG.info("Child shards returned for shardId: {}", listShardsResult);
+        return listShardsResult;
+    }
+
     @Override
     public GetRecordsResponse getRecords(
             String streamArn, String shardId, StartingPosition 
startingPosition) {
@@ -170,6 +189,30 @@ public class DynamoDbStreamsProxy implements StreamProxy {
         return describeStreamResponse;
     }
 
+    private DescribeStreamResponse describeStream(String streamArn, 
ShardFilter shardFilter) {
+        final DescribeStreamRequest describeStreamRequest =
+                DescribeStreamRequest.builder()
+                        .streamArn(streamArn)
+                        .shardFilter(shardFilter)
+                        .build();
+
+        DescribeStreamResponse describeStreamResponse =
+                dynamoDbStreamsClient.describeStream(describeStreamRequest);
+
+        StreamStatus streamStatus = 
describeStreamResponse.streamDescription().streamStatus();
+        if (streamStatus.equals(StreamStatus.ENABLING)) {
+            if (LOG.isWarnEnabled()) {
+                LOG.warn(
+                        String.format(
+                                "The status of stream %s is %s ; result of the 
current "
+                                        + "describeStream operation will not 
contain any shard information.",
+                                streamArn, streamStatus));
+            }
+        }
+
+        return describeStreamResponse;
+    }
+
     private String getShardIterator(
             String streamArn, String shardId, StartingPosition 
startingPosition) {
         GetShardIteratorRequest.Builder requestBuilder =
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java
index fa9bf7c..10c941e 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.connector.dynamodb.source.split.StartingPosition;
 import org.apache.flink.connector.dynamodb.source.util.ListShardsResult;
 
 import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
 
 import javax.annotation.Nullable;
 
@@ -41,6 +42,15 @@ public interface StreamProxy extends Closeable {
      */
     ListShardsResult listShards(String streamArn, @Nullable String 
lastSeenShardId);
 
+    /**
+     * Obtains the child shards of a given shard, filtered by the {@link 
ShardFilter}.
+     *
+     * @param streamArn the ARN of the stream
+     * @param shardFilter the filter to apply
+     * @return shard list
+     */
+    ListShardsResult listShardsWithFilter(String streamArn, ShardFilter 
shardFilter);
+
     /**
      * Retrieves records from the stream.
      *
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java
index 90a105d..9988dfb 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.connector.base.source.reader.RecordEmitter;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
 import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import 
org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEvent;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEventContext;
 import 
org.apache.flink.connector.dynamodb.source.metrics.DynamoDbStreamsShardMetrics;
 import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
 import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplitState;
@@ -32,6 +33,7 @@ import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSpli
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.services.dynamodb.model.Record;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -41,6 +43,7 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 /**
  * Coordinates the reading from assigned splits. Runs on the TaskManager.
@@ -55,6 +58,7 @@ public class DynamoDbStreamsSourceReader<T>
     private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbStreamsSourceReader.class);
     private final Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap;
     private final NavigableMap<Long, Set<DynamoDbStreamsShardSplit>> 
splitFinishedEvents;
+    private final Map<String, List<Shard>> childShardIdMap;
     private long currentCheckpointId;
 
     public DynamoDbStreamsSourceReader(
@@ -62,10 +66,12 @@ public class DynamoDbStreamsSourceReader<T>
             RecordEmitter<Record, T, DynamoDbStreamsShardSplitState> 
recordEmitter,
             Configuration config,
             SourceReaderContext context,
+            Map<String, List<Shard>> childShardIdMap,
             Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap) {
         super(splitFetcherManager, recordEmitter, config, context);
         this.shardMetricGroupMap = shardMetricGroupMap;
         this.splitFinishedEvents = new TreeMap<>();
+        this.childShardIdMap = childShardIdMap;
         this.currentCheckpointId = Long.MIN_VALUE;
     }
 
@@ -83,19 +89,39 @@ public class DynamoDbStreamsSourceReader<T>
         splitFinishedEvents.computeIfAbsent(currentCheckpointId, k -> new 
HashSet<>());
         finishedSplitIds.values().stream()
                 .map(
-                        finishedSplit ->
-                                new DynamoDbStreamsShardSplit(
-                                        finishedSplit.getStreamArn(),
-                                        finishedSplit.getShardId(),
-                                        
finishedSplit.getNextStartingPosition(),
-                                        finishedSplit
-                                                .getDynamoDbStreamsShardSplit()
-                                                .getParentShardId(),
-                                        true))
+                        finishedSplit -> {
+                            List<Shard> childSplits = new ArrayList<>();
+                            String finishedSplitId = 
finishedSplit.getSplitId();
+                            if (childShardIdMap.containsKey(finishedSplitId)) {
+                                List<Shard> childSplitIdsOfFinishedSplit =
+                                        childShardIdMap.get(finishedSplitId);
+                                
childSplits.addAll(childSplitIdsOfFinishedSplit);
+                            }
+                            return new DynamoDbStreamsShardSplit(
+                                    finishedSplit.getStreamArn(),
+                                    finishedSplit.getShardId(),
+                                    finishedSplit.getNextStartingPosition(),
+                                    
finishedSplit.getDynamoDbStreamsShardSplit().getParentShardId(),
+                                    childSplits,
+                                    true);
+                        })
                 .forEach(split -> 
splitFinishedEvents.get(currentCheckpointId).add(split));
 
+        Set<SplitsFinishedEventContext> splitsFinishedEventContextMap =
+                finishedSplitIds.values().stream()
+                        .map(DynamoDbStreamsShardSplitState::getSplitId)
+                        .map(
+                                finishedSplitId ->
+                                        new SplitsFinishedEventContext(
+                                                finishedSplitId,
+                                                childShardIdMap.getOrDefault(
+                                                        finishedSplitId, 
Collections.emptyList())))
+                        .peek(context -> 
childShardIdMap.remove(context.getSplitId()))
+                        .collect(Collectors.toSet());
+
+        LOG.info("Sending splits finished event to coordinator: {}", 
splitsFinishedEventContextMap);
         context.sendSourceEventToCoordinator(
-                new SplitsFinishedEvent(new 
HashSet<>(finishedSplitIds.keySet())));
+                new SplitsFinishedEvent(splitsFinishedEventContextMap));
         finishedSplitIds.keySet().forEach(this::unregisterShardMetricGroup);
     }
 
@@ -121,8 +147,10 @@ public class DynamoDbStreamsSourceReader<T>
                 // buffer. If the next checkpoint doesn't complete,
                 // we would go back to the previous checkpointed
                 // state which will again replay these split finished events.
+                SplitsFinishedEventContext splitsFinishedEventContext =
+                        new SplitsFinishedEventContext(split.splitId(), 
split.getChildSplits());
                 context.sendSourceEventToCoordinator(
-                        new 
SplitsFinishedEvent(Collections.singleton(split.splitId())));
+                        new 
SplitsFinishedEvent(Collections.singleton(splitsFinishedEventContext)));
             } else {
                 dynamoDbStreamsShardSplits.add(split);
             }
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java
index 8a516be..6b89ee9 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java
@@ -27,11 +27,16 @@ import 
org.apache.flink.connector.dynamodb.source.proxy.StreamProxy;
 import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
 import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplitState;
 import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+import org.apache.flink.connector.dynamodb.source.util.ListShardsResult;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
 import software.amazon.awssdk.services.dynamodb.model.Record;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
+import software.amazon.awssdk.services.dynamodb.model.StreamStatus;
 
 import javax.annotation.Nullable;
 
@@ -43,10 +48,14 @@ import java.util.Collections;
 import java.util.Deque;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import static java.util.Collections.singleton;
+import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.CHILD_SHARD_DISCOVERY_MAX_DELAY_MS;
+import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.CHILD_SHARD_DISCOVERY_MIN_DELAY_MS;
+import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.MAX_RETRY_ATTEMPTS_FOR_CHILD_SHARDS;
 
 /**
  * An implementation of the SplitReader that periodically polls the DynamoDb 
stream to retrieve
@@ -64,6 +73,7 @@ public class PollingDynamoDbStreamsShardSplitReader
     private final Duration getRecordsIdlePollingTimeBetweenEmptyPolls;
 
     private final Deque<DynamoDbStreamsShardSplitWithContext> assignedSplits;
+    private final Map<String, List<Shard>> childShardMap;
     private final Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap;
     private final Set<String> pausedSplitIds;
     private static final Logger LOG =
@@ -73,6 +83,7 @@ public class PollingDynamoDbStreamsShardSplitReader
             StreamProxy dynamodbStreamsProxy,
             Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls,
             Duration getRecordsIdlePollingTimeBetweenEmptyPolls,
+            Map<String, List<Shard>> childShardMap,
             Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap) {
         this.dynamodbStreams = dynamodbStreamsProxy;
         this.getRecordsIdlePollingTimeBetweenNonEmptyPolls =
@@ -80,6 +91,7 @@ public class PollingDynamoDbStreamsShardSplitReader
         this.getRecordsIdlePollingTimeBetweenEmptyPolls =
                 getRecordsIdlePollingTimeBetweenEmptyPolls;
         this.shardMetricGroupMap = shardMetricGroupMap;
+        this.childShardMap = childShardMap;
         this.assignedSplits = new ArrayDeque<>();
         this.pausedSplitIds = new HashSet<>();
     }
@@ -106,6 +118,11 @@ public class PollingDynamoDbStreamsShardSplitReader
         }
 
         long currentTime = System.currentTimeMillis();
+
+        if (splitContext.splitState.isShardEndReached()) {
+            return handleShardEnd(splitContext, currentTime);
+        }
+
         long nextEligibleTime = getNextEligibleTime(splitContext);
 
         LOG.debug(
@@ -132,15 +149,11 @@ public class PollingDynamoDbStreamsShardSplitReader
 
         splitContext.lastPollTimeMillis = currentTime;
         splitContext.wasLastPollEmpty = isEmptyPoll;
+        splitContext.splitState.setShardEndReached(isComplete);
 
+        assignedSplits.add(splitContext);
         if (isEmptyPoll) {
-            if (isComplete) {
-                return new DynamoDbStreamRecordsWithSplitIds(
-                        Collections.emptyIterator(), 
splitContext.splitState.getSplitId(), true);
-            } else {
-                assignedSplits.add(splitContext);
-                return INCOMPLETE_SHARD_EMPTY_RECORDS;
-            }
+            return INCOMPLETE_SHARD_EMPTY_RECORDS;
         } else {
             DynamoDbStreamsShardMetrics shardMetrics =
                     
shardMetricGroupMap.get(splitContext.splitState.getShardId());
@@ -164,13 +177,66 @@ public class PollingDynamoDbStreamsShardSplitReader
                                 .dynamodb()
                                 .sequenceNumber()));
 
-        if (!isComplete) {
-            assignedSplits.add(splitContext);
-        }
         return new DynamoDbStreamRecordsWithSplitIds(
                 getRecordsResponse.records().iterator(),
                 splitContext.splitState.getSplitId(),
-                isComplete);
+                false);
+    }
+
+    private RecordsWithSplitIds<Record> handleShardEnd(
+            DynamoDbStreamsShardSplitWithContext splitContext, long 
currentTime) {
+        if (!splitContext.hasAttemptedChildShardDiscovery) {
+            splitContext.hasAttemptedChildShardDiscovery = true;
+            splitContext.childShardDiscoveryAttempts = 0;
+        }
+
+        if (splitContext.childShardDiscoveryAttempts < 
MAX_RETRY_ATTEMPTS_FOR_CHILD_SHARDS) {
+            long nextChildShardDiscoveryEligibleTime =
+                    getNextEligibleTimeForChildDiscovery(splitContext);
+            if (currentTime >= nextChildShardDiscoveryEligibleTime) {
+                ListShardsResult listShardsResult =
+                        dynamodbStreams.listShardsWithFilter(
+                                splitContext.splitState.getStreamArn(),
+                                ShardFilter.builder()
+                                        
.shardId(splitContext.splitState.getShardId())
+                                        .type(ShardFilterType.CHILD_SHARDS)
+                                        .build());
+
+                if 
(!StreamStatus.ENABLED.equals(listShardsResult.getStreamStatus())) {
+                    return new DynamoDbStreamRecordsWithSplitIds(
+                            Collections.emptyIterator(),
+                            splitContext.splitState.getSplitId(),
+                            true);
+                }
+
+                List<Shard> childShards = listShardsResult.getShards();
+                if (!childShards.isEmpty()) {
+                    
this.childShardMap.put(splitContext.splitState.getSplitId(), childShards);
+                    return new DynamoDbStreamRecordsWithSplitIds(
+                            Collections.emptyIterator(),
+                            splitContext.splitState.getSplitId(),
+                            true);
+                }
+                splitContext.childShardDiscoveryAttempts++;
+                splitContext.lastChildShardDiscoveryAttemptTime = currentTime;
+            }
+            assignedSplits.add(splitContext);
+            return INCOMPLETE_SHARD_EMPTY_RECORDS;
+        } else {
+            return new DynamoDbStreamRecordsWithSplitIds(
+                    Collections.emptyIterator(), 
splitContext.splitState.getSplitId(), true);
+        }
+    }
+
+    private long getNextEligibleTimeForChildDiscovery(
+            DynamoDbStreamsShardSplitWithContext splitContext) {
+
+        long exponentialDelay =
+                Math.min(
+                        CHILD_SHARD_DISCOVERY_MIN_DELAY_MS
+                                * (1L << 
splitContext.childShardDiscoveryAttempts),
+                        CHILD_SHARD_DISCOVERY_MAX_DELAY_MS);
+        return splitContext.lastChildShardDiscoveryAttemptTime + 
exponentialDelay;
     }
 
     private void sleep(long milliseconds) throws IOException {
@@ -254,11 +320,15 @@ public class PollingDynamoDbStreamsShardSplitReader
         final DynamoDbStreamsShardSplitState splitState;
         long lastPollTimeMillis;
         boolean wasLastPollEmpty;
+        boolean hasAttemptedChildShardDiscovery;
+        int childShardDiscoveryAttempts;
+        long lastChildShardDiscoveryAttemptTime;
 
         DynamoDbStreamsShardSplitWithContext(DynamoDbStreamsShardSplitState 
splitState) {
             this.splitState = splitState;
             this.lastPollTimeMillis = System.currentTimeMillis();
             this.wasLastPollEmpty = false;
+            hasAttemptedChildShardDiscovery = false;
         }
     }
 }
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java
index b79f1e8..6f1943d 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java
@@ -23,7 +23,11 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.dynamodb.source.enumerator.DynamoDbStreamsSourceEnumerator;
 
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -41,6 +45,7 @@ public final class DynamoDbStreamsShardSplit implements 
SourceSplit {
     private final StartingPosition startingPosition;
     private final String parentShardId;
     private final boolean isFinished;
+    private final List<Shard> childSplits;
 
     public DynamoDbStreamsShardSplit(
             String streamArn,
@@ -55,6 +60,25 @@ public final class DynamoDbStreamsShardSplit implements 
SourceSplit {
             String shardId,
             StartingPosition startingPosition,
             String parentShardId,
+            List<Shard> childSplits) {
+        this(streamArn, shardId, startingPosition, parentShardId, childSplits, 
false);
+    }
+
+    public DynamoDbStreamsShardSplit(
+            String streamArn,
+            String shardId,
+            StartingPosition startingPosition,
+            String parentShardId,
+            boolean isFinished) {
+        this(streamArn, shardId, startingPosition, parentShardId, List.of(), 
isFinished);
+    }
+
+    public DynamoDbStreamsShardSplit(
+            String streamArn,
+            String shardId,
+            StartingPosition startingPosition,
+            String parentShardId,
+            List<Shard> childSplits,
             boolean isFinished) {
         checkNotNull(streamArn, "streamArn cannot be null");
         checkNotNull(shardId, "shardId cannot be null");
@@ -65,6 +89,7 @@ public final class DynamoDbStreamsShardSplit implements 
SourceSplit {
         this.startingPosition = startingPosition;
         this.parentShardId = parentShardId;
         this.isFinished = isFinished;
+        this.childSplits = childSplits;
     }
 
     @Override
@@ -92,6 +117,10 @@ public final class DynamoDbStreamsShardSplit implements 
SourceSplit {
         return isFinished;
     }
 
+    public List<Shard> getChildSplits() {
+        return childSplits;
+    }
+
     @Override
     public String toString() {
         return "DynamoDbStreamsShardSplit{"
@@ -108,6 +137,11 @@ public final class DynamoDbStreamsShardSplit implements 
SourceSplit {
                 + "]"
                 + ", isFinished="
                 + isFinished
+                + ", childSplitIds=["
+                + 
childSplits.stream().map(Shard::toString).collect(Collectors.joining(","))
+                + "], "
+                + ", isFinished="
+                + isFinished
                 + "}";
     }
 
@@ -124,11 +158,13 @@ public final class DynamoDbStreamsShardSplit implements 
SourceSplit {
                 && Objects.equals(shardId, that.shardId)
                 && Objects.equals(startingPosition, that.startingPosition)
                 && Objects.equals(parentShardId, that.parentShardId)
-                && Objects.equals(isFinished, that.isFinished);
+                && Objects.equals(isFinished, that.isFinished)
+                && Objects.equals(childSplits, that.childSplits);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(streamArn, shardId, startingPosition, 
parentShardId, isFinished);
+        return Objects.hash(
+                streamArn, shardId, startingPosition, parentShardId, 
isFinished, childSplits);
     }
 }
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java
index b10bfce..10ddc79 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.io.VersionMismatchException;
 
+import software.amazon.awssdk.services.dynamodb.model.SequenceNumberRange;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
 import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
 
 import java.io.ByteArrayInputStream;
@@ -29,8 +31,10 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 /**
@@ -41,8 +45,8 @@ import java.util.Set;
 public class DynamoDbStreamsShardSplitSerializer
         implements SimpleVersionedSerializer<DynamoDbStreamsShardSplit> {
 
-    private static final Set<Integer> COMPATIBLE_VERSIONS = new 
HashSet<>(Arrays.asList(0, 1));
-    private static final int CURRENT_VERSION = 1;
+    private static final Set<Integer> COMPATIBLE_VERSIONS = new 
HashSet<>(Arrays.asList(0, 1, 2));
+    private static final int CURRENT_VERSION = 2;
 
     @Override
     public int getVersion() {
@@ -74,6 +78,18 @@ public class DynamoDbStreamsShardSplitSerializer
                 out.writeUTF(split.getParentShardId());
             }
             out.writeBoolean(split.isFinished());
+            out.writeInt(split.getChildSplits().size());
+            for (Shard childSplit : split.getChildSplits()) {
+                out.writeUTF(childSplit.shardId());
+                out.writeUTF(childSplit.parentShardId());
+                
out.writeUTF(childSplit.sequenceNumberRange().startingSequenceNumber());
+                if (childSplit.sequenceNumberRange().endingSequenceNumber() == 
null) {
+                    out.writeBoolean(false);
+                } else {
+                    out.writeBoolean(true);
+                    
out.writeUTF(childSplit.sequenceNumberRange().endingSequenceNumber());
+                }
+            }
 
             out.flush();
             return baos.toByteArray();
@@ -116,11 +132,40 @@ public class DynamoDbStreamsShardSplitSerializer
                 isFinished = in.readBoolean();
             }
 
+            int childSplitSize = 0;
+            List<Shard> childSplits = new ArrayList<>();
+            if (version > 1) {
+                childSplitSize = in.readInt();
+                if (childSplitSize > 0) {
+                    for (int i = 0; i < childSplitSize; i++) {
+                        String splitId = in.readUTF();
+                        String parentSplitId = in.readUTF();
+                        String startingSequenceNumber = in.readUTF();
+                        String endingSequenceNumber = null;
+                        if (in.readBoolean()) {
+                            endingSequenceNumber = in.readUTF();
+                        }
+                        childSplits.add(
+                                Shard.builder()
+                                        .shardId(splitId)
+                                        .parentShardId(parentSplitId)
+                                        .sequenceNumberRange(
+                                                SequenceNumberRange.builder()
+                                                        
.startingSequenceNumber(
+                                                                
startingSequenceNumber)
+                                                        
.endingSequenceNumber(endingSequenceNumber)
+                                                        .build())
+                                        .build());
+                    }
+                }
+            }
+
             return new DynamoDbStreamsShardSplit(
                     streamArn,
                     shardId,
                     new StartingPosition(shardIteratorType, startingMarker),
                     parentShardId,
+                    childSplits,
                     isFinished);
         }
     }
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java
index 47e20a1..1ddc137 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java
@@ -29,10 +29,12 @@ public class DynamoDbStreamsShardSplitState {
     private final DynamoDbStreamsShardSplit dynamoDbStreamsShardSplit;
     private StartingPosition nextStartingPosition;
     private String nextShardIterator;
+    private boolean shardEndReached;
 
     public DynamoDbStreamsShardSplitState(DynamoDbStreamsShardSplit 
dynamoDbStreamsShardSplit) {
         this.dynamoDbStreamsShardSplit = dynamoDbStreamsShardSplit;
         this.nextStartingPosition = 
dynamoDbStreamsShardSplit.getStartingPosition();
+        this.shardEndReached = false;
     }
 
     public DynamoDbStreamsShardSplit getDynamoDbStreamsShardSplit() {
@@ -70,4 +72,12 @@ public class DynamoDbStreamsShardSplitState {
     public void setNextShardIterator(String nextShardIterator) {
         this.nextShardIterator = nextShardIterator;
     }
+
+    public boolean isShardEndReached() {
+        return shardEndReached;
+    }
+
+    public void setShardEndReached(boolean shardEndReached) {
+        this.shardEndReached = shardEndReached;
+    }
 }
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java
index 031d6de..4c543a3 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants;
 import 
org.apache.flink.connector.dynamodb.source.enumerator.assigner.ShardAssignerFactory;
 import 
org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEvent;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEventContext;
 import org.apache.flink.connector.dynamodb.source.proxy.StreamProxy;
 import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
 import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
@@ -43,8 +44,8 @@ import 
software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Stream;
 
 import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION;
@@ -304,21 +305,28 @@ class DynamoDbStreamsSourceEnumeratorTest {
                     };
             streamProxy.addShards(childShards);
             enumerator.handleSourceEvent(
-                    subtaskId, new 
SplitsFinishedEvent(Collections.singleton(shards[2].shardId())));
-            // Given no resharding occurs (list of shards remains the same)
-            // When first periodic discovery runs
-            context.runPeriodicCallable(0);
-            // Then no additional splits are assigned
-            SplitsAssignment<DynamoDbStreamsShardSplit> 
periodicDiscoverySplitAssignment =
-                    context.getSplitsAssignmentSequence().get(2);
+                    subtaskId,
+                    new SplitsFinishedEvent(
+                            Set.of(
+                                    new SplitsFinishedEventContext(
+                                            shards[2].shardId(), 
List.of(childShards[0])))));
+
             DynamoDbStreamsShardSplit childSplit =
                     new DynamoDbStreamsShardSplit(
                             STREAM_ARN,
                             childShards[0].shardId(),
                             StartingPosition.fromStart(),
                             shards[2].shardId());
-            
assertThat(periodicDiscoverySplitAssignment.assignment().get(subtaskId))
+            
assertThat(context.getSplitsAssignmentSequence().get(1).assignment().get(subtaskId))
                     .containsExactly(childSplit);
+            // Given no resharding occurs (list of shards remains the same)
+            // When first periodic discovery runs
+            context.runPeriodicCallable(0);
+            // Then no additional splits are assigned
+            SplitsAssignment<DynamoDbStreamsShardSplit> 
periodicDiscoverySplitAssignment =
+                    context.getSplitsAssignmentSequence().get(2);
+            
assertThat(periodicDiscoverySplitAssignment.assignment().get(subtaskId))
+                    .isNullOrEmpty();
         }
     }
 
@@ -342,7 +350,7 @@ class DynamoDbStreamsSourceEnumeratorTest {
             Instant startTimestamp = Instant.now();
             DynamoDbStreamsSourceEnumeratorState state =
                     new DynamoDbStreamsSourceEnumeratorState(
-                            Collections.singletonList(
+                            List.of(
                                     new 
DynamoDBStreamsShardSplitWithAssignmentStatus(
                                             new DynamoDbStreamsShardSplit(
                                                     STREAM_ARN,
@@ -425,7 +433,7 @@ class DynamoDbStreamsSourceEnumeratorTest {
                     getTestStreamProxy();
             DynamoDbStreamsSourceEnumerator enumerator =
                     getSimpleEnumeratorWithNoState(context, streamProxy);
-            List<DynamoDbStreamsShardSplit> splits = 
Collections.singletonList(getTestSplit());
+            List<DynamoDbStreamsShardSplit> splits = List.of(getTestSplit());
 
             // Given enumerator has no assigned splits
             // When we add splits back
@@ -765,7 +773,11 @@ class DynamoDbStreamsSourceEnumeratorTest {
             context.runNextOneTimeCallable();
 
             enumerator.handleSourceEvent(
-                    1, new 
SplitsFinishedEvent(Collections.singleton(completedShard.shardId())));
+                    1,
+                    new SplitsFinishedEvent(
+                            Set.of(
+                                    new SplitsFinishedEventContext(
+                                            completedShard.shardId(), 
List.of(shards[1])))));
 
             // When restored from state
             DynamoDbStreamsSourceEnumeratorState snapshotState = 
enumerator.snapshotState(1);
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java
index 8c5b204..9c662a7 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java
@@ -38,6 +38,8 @@ import 
software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.dynamodb.model.Record;
 import 
software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
 import software.amazon.awssdk.services.dynamodb.model.Shard;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
 import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
 import software.amazon.awssdk.services.dynamodb.model.StreamDescription;
 import software.amazon.awssdk.services.dynamodb.model.StreamStatus;
@@ -50,8 +52,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
 import static 
org.apache.flink.connector.dynamodb.source.util.TestUtil.generateShardId;
-import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
-import static 
org.assertj.core.api.AssertionsForClassTypes.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
 
 /** Tests to validate {@link DynamoDbStreamsProxy}. */
 public class DynamoDbStreamsProxyTest {
@@ -88,6 +90,86 @@ public class DynamoDbStreamsProxyTest {
                 .isEqualTo(expectedListShardsResult);
     }
 
+    @Test
+    void testListShardsWithFilterForChildShards() {
+        final String streamArn =
+                
"arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-01-01T00:00:00.826";
+        final String parentShardId = "shardId-000000000001";
+
+        // Create child shards that we expect to be returned
+        final List<Shard> childShards =
+                List.of(
+                        Shard.builder()
+                                .shardId("shardId-000000000002")
+                                .parentShardId(parentShardId)
+                                .build(),
+                        Shard.builder()
+                                .shardId("shardId-000000000003")
+                                .parentShardId(parentShardId)
+                                .build());
+
+        // Create some other shards that should not be returned
+        final List<Shard> otherShards =
+                List.of(
+                        Shard.builder()
+                                .shardId("shardId-000000000004")
+                                .parentShardId("different-parent")
+                                .build(),
+                        
Shard.builder().shardId("shardId-000000000005").build());
+
+        // Set up the expected response
+        final ListShardsResult expectedResult = new ListShardsResult();
+        expectedResult.addShards(childShards);
+        expectedResult.setStreamStatus(StreamStatus.ENABLED);
+
+        // Create describe stream response with all shards
+        List<Shard> allShards = new ArrayList<>();
+        allShards.addAll(childShards);
+        allShards.addAll(otherShards);
+
+        DescribeStreamResponse describeStreamResponse =
+                DescribeStreamResponse.builder()
+                        .streamDescription(
+                                StreamDescription.builder()
+                                        .shards(allShards)
+                                        .streamStatus(StreamStatus.ENABLED)
+                                        .lastEvaluatedShardId(null)
+                                        .build())
+                        .build();
+
+        TestingDynamoDbStreamsClient testingDynamoDbStreamsClient =
+                new TestingDynamoDbStreamsClient();
+
+        // Verify the correct request is made
+        testingDynamoDbStreamsClient.setDescribeStreamValidation(
+                request -> {
+                    assertThat(request.streamArn()).isEqualTo(streamArn);
+                    assertThat(request.shardFilter()).isNotNull();
+                    assertThat(request.shardFilter().type())
+                            .isEqualTo(ShardFilterType.CHILD_SHARDS);
+                    
assertThat(request.shardFilter().shardId()).isEqualTo(parentShardId);
+                });
+
+        
testingDynamoDbStreamsClient.setDescribeStreamResponse(describeStreamResponse);
+
+        DynamoDbStreamsProxy dynamoDbStreamsProxy =
+                new DynamoDbStreamsProxy(testingDynamoDbStreamsClient, 
HTTP_CLIENT);
+
+        // Create the filter for child shards
+        ShardFilter childShardFilter =
+                ShardFilter.builder()
+                        .type(ShardFilterType.CHILD_SHARDS)
+                        .shardId(parentShardId)
+                        .build();
+
+        // Execute the method and verify results
+        ListShardsResult result =
+                dynamoDbStreamsProxy.listShardsWithFilter(streamArn, 
childShardFilter);
+
+        
assertThat(result.getShards()).hasSize(2).containsExactlyInAnyOrderElementsOf(childShards);
+        assertThat(result.getStreamStatus()).isEqualTo(StreamStatus.ENABLED);
+    }
+
     @Test
     void testGetRecordsInitialReadFromTrimHorizon() {
         final String streamArn =
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java
index d66c0bb..12e8ac4 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import 
org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEvent;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEventContext;
 import 
org.apache.flink.connector.dynamodb.source.metrics.DynamoDbStreamsShardMetrics;
 import org.apache.flink.connector.dynamodb.source.proxy.StreamProxy;
 import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
@@ -37,6 +38,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -73,6 +75,7 @@ class DynamoDbStreamsSourceReaderTest {
                                 testStreamProxy,
                                 NON_EMPTY_POLLING_DELAY_MILLIS,
                                 EMPTY_POLLING_DELAY_MILLIS,
+                                new ConcurrentHashMap<>(),
                                 shardMetricGroupMap);
 
         testingReaderContext =
@@ -84,6 +87,7 @@ class DynamoDbStreamsSourceReaderTest {
                         new DynamoDbStreamsRecordEmitter<>(null),
                         new Configuration(),
                         testingReaderContext,
+                        new ConcurrentHashMap<>(),
                         shardMetricGroupMap);
     }
 
@@ -122,12 +126,14 @@ class DynamoDbStreamsSourceReaderTest {
 
         List<SourceEvent> events = testingReaderContext.getSentEvents();
 
-        Set<String> expectedSplitIds = Collections.singleton(split.splitId());
+        Set<SplitsFinishedEventContext> expectedFinishedSplits =
+                Collections.singleton(
+                        new SplitsFinishedEventContext(split.splitId(), new 
ArrayList<>()));
         assertThat(events)
                 .singleElement()
                 .isInstanceOf(SplitsFinishedEvent.class)
                 .usingRecursiveComparison()
-                .isEqualTo(new SplitsFinishedEvent(expectedSplitIds));
+                .isEqualTo(new SplitsFinishedEvent(expectedFinishedSplits));
     }
 
     @Test
@@ -225,8 +231,10 @@ class DynamoDbStreamsSourceReaderTest {
                 .allSatisfy(
                         e -> {
                             SplitsFinishedEvent event = (SplitsFinishedEvent) 
e;
-                            assertThat(event.getFinishedSplitIds()).hasSize(1);
-                            assertThat(event.getFinishedSplitIds())
+                            assertThat(event.getFinishedSplits()).hasSize(1);
+                            assertThat(
+                                            event.getFinishedSplits().stream()
+                                                    
.map(SplitsFinishedEventContext::getSplitId))
                                     .containsAnyOf("finished-split-1", 
"finished-split-2");
                         });
 
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java
index f3cc169..bcd57cf 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java
@@ -73,6 +73,7 @@ class PollingDynamoDbStreamsShardSplitReaderTest {
                         testStreamProxy,
                         NON_EMPTY_POLLING_DELAY_MILLIS,
                         EMPTY_POLLING_DELAY_MILLIS,
+                        new ConcurrentHashMap<>(),
                         shardMetricGroupMap);
     }
 
@@ -235,12 +236,20 @@ class PollingDynamoDbStreamsShardSplitReaderTest {
                                 
assertThat(retrievedRecords.finishedSplits()).isEmpty();
                                 
fetchedRecords.add(retrievedRecords.nextRecordFromSplit());
                             }
-
-                            assertThat(retrievedRecords.nextSplit()).isNull();
-                            
assertThat(retrievedRecords.finishedSplits()).contains(split.splitId());
                             assertThat(fetchedRecords)
                                     
.containsExactlyInAnyOrderElementsOf(expectedRecords);
                         });
+
+        // Now wait for the split to be marked as finished after child shard 
discovery attempts
+        await().pollDelay(NON_EMPTY_POLLING_DELAY_MILLIS)
+                .atMost(Duration.ofSeconds(30)) // Allow enough time for all 
retry attempts
+                .untilAsserted(
+                        () -> {
+                            RecordsWithSplitIds<Record> retrievedRecords = 
splitReader.fetch();
+                            // No more records should be returned
+                            
assertThat(readAllRecords(retrievedRecords)).isEmpty();
+                            
assertThat(retrievedRecords.finishedSplits()).contains(split.splitId());
+                        });
     }
 
     @Test
@@ -400,6 +409,7 @@ class PollingDynamoDbStreamsShardSplitReaderTest {
                         testStreamProxy,
                         NON_EMPTY_POLLING_DELAY_MILLIS,
                         testEmptyPollDelay,
+                        new ConcurrentHashMap<>(),
                         shardMetricGroupMap);
 
         // Immediate second poll - should return empty due to polling delay
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializerTest.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializerTest.java
index ebf1178..410f93b 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializerTest.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializerTest.java
@@ -29,6 +29,7 @@ import java.util.stream.Stream;
 import static 
org.apache.flink.connector.dynamodb.source.util.TestUtil.SHARD_ID;
 import static 
org.apache.flink.connector.dynamodb.source.util.TestUtil.STREAM_ARN;
 import static 
org.apache.flink.connector.dynamodb.source.util.TestUtil.getTestSplit;
+import static 
org.apache.flink.connector.dynamodb.source.util.TestUtil.getTestSplitWithChildShards;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 import static 
org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
 
@@ -47,6 +48,19 @@ class DynamoDbStreamsShardSplitSerializerTest {
         
assertThat(deserializedSplit).usingRecursiveComparison().isEqualTo(initialSplit);
     }
 
+    @Test
+    void testSerializeAndDeserializeWithChildSplits() throws Exception {
+        final DynamoDbStreamsShardSplit initialSplit = 
getTestSplitWithChildShards();
+
+        DynamoDbStreamsShardSplitSerializer serializer = new 
DynamoDbStreamsShardSplitSerializer();
+
+        byte[] serialized = serializer.serialize(initialSplit);
+        DynamoDbStreamsShardSplit deserializedSplit =
+                serializer.deserialize(serializer.getVersion(), serialized);
+
+        
assertThat(deserializedSplit).usingRecursiveComparison().isEqualTo(initialSplit);
+    }
+
     @ParameterizedTest
     @MethodSource("provideStartingPositions")
     void testSerializeAndDeserializeWithStartingPosition(StartingPosition 
startingPosition)
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsClientProvider.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsClientProvider.java
index c4c3285..8e6c0a5 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsClientProvider.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsClientProvider.java
@@ -26,12 +26,18 @@ import 
software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
 import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
 import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
+import software.amazon.awssdk.services.dynamodb.model.StreamDescription;
 import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
 import 
software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsServiceClientConfiguration;
 
 import java.util.ArrayDeque;
 import java.util.Deque;
+import java.util.List;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 /** Provides {@link DynamoDbStreamsClient} with mocked DynamoDbStreams 
behavior. */
 public class DynamoDbStreamsClientProvider {
@@ -84,6 +90,29 @@ public class DynamoDbStreamsClientProvider {
                 throws AwsServiceException, SdkClientException {
 
             describeStreamValidation.accept(describeStreamRequest);
+            ShardFilter shardFilter = describeStreamRequest.shardFilter();
+            if (shardFilter != null && 
ShardFilterType.CHILD_SHARDS.equals(shardFilter.type())) {
+                List<Shard> shards = 
describeStreamResponse.streamDescription().shards();
+                List<Shard> childShards =
+                        shards.stream()
+                                .filter(
+                                        shard ->
+                                                shard.parentShardId() != null
+                                                        && 
shard.parentShardId()
+                                                                
.equals(shardFilter.shardId()))
+                                .collect(Collectors.toList());
+                return DescribeStreamResponse.builder()
+                        .streamDescription(
+                                StreamDescription.builder()
+                                        .shards(childShards)
+                                        
.streamArn(describeStreamRequest.streamArn())
+                                        .streamStatus(
+                                                describeStreamResponse
+                                                        .streamDescription()
+                                                        .streamStatus())
+                                        .build())
+                        .build();
+            }
             return describeStreamResponse;
         }
 
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java
index c9f45c1..7970f73 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java
@@ -26,6 +26,8 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
 import software.amazon.awssdk.services.dynamodb.model.Record;
 import software.amazon.awssdk.services.dynamodb.model.Shard;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
 
 import javax.annotation.Nullable;
 
@@ -84,6 +86,27 @@ public class DynamoDbStreamsProxyProvider {
             return listShardsResult;
         }
 
+        @Override
+        public ListShardsResult listShardsWithFilter(String streamArn, 
ShardFilter shardFilter) {
+            if (!ShardFilterType.CHILD_SHARDS.equals(shardFilter.type())) {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "ShardFilterType %s not supported", 
shardFilter.type().name()));
+            }
+
+            ListShardsResult listShardsResult = new ListShardsResult();
+            List<Shard> childShards =
+                    shards.stream()
+                            .filter(
+                                    shard ->
+                                            shard.parentShardId() != null
+                                                    && shard.parentShardId()
+                                                            
.equals(shardFilter.shardId()))
+                            .collect(Collectors.toList());
+            listShardsResult.addShards(childShards);
+            return listShardsResult;
+        }
+
         @Override
         public GetRecordsResponse getRecords(
                 String streamArn, String shardId, StartingPosition 
startingPosition) {
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/TestUtil.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/TestUtil.java
index 9c3fcb9..4c15959 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/TestUtil.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/TestUtil.java
@@ -35,7 +35,10 @@ import 
software.amazon.awssdk.services.dynamodb.model.StreamRecord;
 
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -45,6 +48,8 @@ public class TestUtil {
     public static final String STREAM_ARN =
             
"arn:aws:dynamodb:us-east-1:123456789012:stream/2024-01-01T00:00:00Z";
     public static final String SHARD_ID = "shardId-000000000002";
+    public static final String CHILD_SHARD_ID_1 = "shardId-000000000003";
+    public static final String CHILD_SHARD_ID_2 = "shardId-000000000004";
     public static final SimpleStringSchema STRING_SCHEMA = new 
SimpleStringSchema();
 
     public static final long MILLIS_BEHIND_LATEST_TEST_VALUE = -1L;
@@ -95,6 +100,15 @@ public class TestUtil {
         return getTestSplit(SHARD_ID);
     }
 
+    public static DynamoDbStreamsShardSplit getTestSplitWithChildShards() {
+        return getTestSplitWithChildShards(SHARD_ID);
+    }
+
+    public static DynamoDbStreamsShardSplit getTestSplitWithChildShards(String 
shardId) {
+        return getTestSplit(
+                STREAM_ARN, SHARD_ID, Arrays.asList(CHILD_SHARD_ID_1, 
CHILD_SHARD_ID_2));
+    }
+
     public static DynamoDbStreamsShardSplit getTestSplit(String shardId) {
         return getTestSplit(STREAM_ARN, shardId);
     }
@@ -104,6 +118,27 @@ public class TestUtil {
                 streamArn, shardId, StartingPosition.fromStart(), null);
     }
 
+    public static DynamoDbStreamsShardSplit getTestSplit(
+            String streamArn, String shardId, List<String> childShardIds) {
+        return new DynamoDbStreamsShardSplit(
+                streamArn,
+                shardId,
+                StartingPosition.fromStart(),
+                null,
+                childShardIds.stream()
+                        .map(
+                                childShardId ->
+                                        Shard.builder()
+                                                .parentShardId(shardId)
+                                                .shardId(childShardId)
+                                                .sequenceNumberRange(
+                                                        
SequenceNumberRange.builder()
+                                                                
.startingSequenceNumber("1234")
+                                                                .build())
+                                                .build())
+                        .collect(Collectors.toList()));
+    }
+
     public static DynamoDbStreamsShardSplit getTestSplit(StartingPosition 
startingPosition) {
         return new DynamoDbStreamsShardSplit(STREAM_ARN, SHARD_ID, 
startingPosition, null);
     }
diff --git a/pom.xml b/pom.xml
index ccb262e..167b40d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@ under the License.
     </scm>
 
     <properties>
-        <aws.sdkv2.version>2.26.19</aws.sdkv2.version>
+        <aws.sdkv2.version>2.40.3</aws.sdkv2.version>
         <netty.version>4.1.86.Final</netty.version>
         <flink.version>2.0.0</flink.version>
         <jackson-bom.version>2.14.3</jackson-bom.version>

Reply via email to