This is an automated email from the ASF dual-hosted git repository.

hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new 41f6710  [FLINK-36187][Connectors/Kinesis] Implement 
UniformShardAssigner using Kinesis shard hashes in the KinesisStreamsSource
41f6710 is described below

commit 41f6710e3c2da4ca744712bfc7f58ea68773f8c8
Author: Hong Teoh <[email protected]>
AuthorDate: Wed Sep 4 17:25:55 2024 +0100

    [FLINK-36187][Connectors/Kinesis] Implement UniformShardAssigner using 
Kinesis shard hashes in the KinesisStreamsSource
---
 .../enumerator/KinesisStreamsSourceEnumerator.java |   7 +-
 .../enumerator/assigner/UniformShardAssigner.java  |  53 ++++++-----
 .../kinesis/source/split/KinesisShardSplit.java    |  36 +++++++-
 .../source/split/KinesisShardSplitSerializer.java  |  16 +++-
 .../source/split/KinesisShardSplitState.java       |   4 +-
 .../KinesisStreamsSourceEnumeratorTest.java        |  18 ++--
 .../assigner/UniformShardAssignerTest.java         | 100 +++++++++------------
 .../source/proxy/KinesisStreamProxyTest.java       |  13 ++-
 .../split/KinesisShardSplitSerializerTest.java     |  13 ++-
 .../source/split/KinesisShardSplitTest.java        |  64 ++++++++++++-
 .../source/util/KinesisStreamProxyProvider.java    |  19 +++-
 .../connector/kinesis/source/util/TestUtil.java    |  67 +++++++++++++-
 12 files changed, 299 insertions(+), 111 deletions(-)

diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java
index 741bc5f..1f9f707 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java
@@ -282,7 +282,12 @@ public class KinesisStreamsSourceEnumerator
             }
             splits.add(
                     new KinesisShardSplit(
-                            streamArn, shard.shardId(), startingPosition, 
parentShardIds));
+                            streamArn,
+                            shard.shardId(),
+                            startingPosition,
+                            parentShardIds,
+                            shard.hashKeyRange().startingHashKey(),
+                            shard.hashKeyRange().endingHashKey()));
         }
 
         return splits;
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssigner.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssigner.java
index 7937ed7..f5d45df 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssigner.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssigner.java
@@ -23,37 +23,42 @@ import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
 import org.apache.flink.util.Preconditions;
 
-import java.util.Collections;
+import java.math.BigInteger;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
-/** An implementation of the {@link KinesisShardAssigner} that assigns splits 
uniformly. */
+/**
+ * An implementation of the {@link KinesisShardAssigner} that maps Kinesis 
shard hash-key ranges to
+ * Flink subtasks. This shard assigner will ensure that the hash-key range 
across all shards are
+ * split across all subtasks (on a shard-level granularity). This is essential 
to ensure even
+ * distribution of open shards across subtasks when the Kinesis stream is 
scaled.
+ */
 @Internal
 public class UniformShardAssigner implements KinesisShardAssigner {
-    @Override
-    public int assign(KinesisShardSplit split, Context context) {
-        int selectedSubtask = -1;
-        int curMinAssignment = Integer.MAX_VALUE;
-        Map<Integer, Set<KinesisShardSplit>> splitAssignment = 
context.getCurrentSplitAssignment();
-        Map<Integer, List<KinesisShardSplit>> pendingSplitAssignments =
-                context.getPendingSplitAssignments();
 
-        for (int subtaskId : context.getRegisteredReaders().keySet()) {
-            int subtaskAssignmentSize =
-                    splitAssignment.getOrDefault(subtaskId, 
Collections.emptySet()).size()
-                            + pendingSplitAssignments
-                                    .getOrDefault(subtaskId, 
Collections.emptyList())
-                                    .size();
-            if (subtaskAssignmentSize < curMinAssignment) {
-                curMinAssignment = subtaskAssignmentSize;
-                selectedSubtask = subtaskId;
-            }
-        }
+    private static final BigInteger TWO = BigInteger.valueOf(2);
 
+    private static final BigInteger HASH_KEY_BOUND = TWO.pow(128);
+
+    @Override
+    public int assign(KinesisShardSplit split, Context context) {
         Preconditions.checkArgument(
-                selectedSubtask != -1,
+                !context.getRegisteredReaders().isEmpty(),
                 "Expected at least one registered reader. Unable to assign 
split.");
-        return selectedSubtask;
+        BigInteger hashKeyStart = new BigInteger(split.getStartingHashKey());
+        BigInteger hashKeyEnd = new BigInteger(split.getEndingHashKey());
+        BigInteger hashKeyMid = hashKeyStart.add(hashKeyEnd).divide(TWO);
+        // index = hashKeyMid / HASH_KEY_BOUND * nSubtasks + stream-specific 
offset
+        // The stream specific offset is added so that different streams will 
be less likely to be
+        // distributed in the same way, even if they are sharded in the same 
way.
+        // (The caller takes result modulo nSubtasks.)
+        List<Integer> registeredReaders = new 
ArrayList<>(context.getRegisteredReaders().keySet());
+        int selectedReaderIndex =
+                Math.abs(
+                        hashKeyMid
+                                
.multiply(BigInteger.valueOf(registeredReaders.size()))
+                                .divide(HASH_KEY_BOUND)
+                                .intValue());
+        return registeredReaders.get(selectedReaderIndex);
     }
 }
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java
index 309701d..fb5fef5 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java
@@ -42,21 +42,29 @@ public final class KinesisShardSplit implements SourceSplit 
{
     private final String shardId;
     private final StartingPosition startingPosition;
     private final Set<String> parentShardIds;
+    private final String startingHashKey;
+    private final String endingHashKey;
 
     public KinesisShardSplit(
             String streamArn,
             String shardId,
             StartingPosition startingPosition,
-            Set<String> parentShardIds) {
+            Set<String> parentShardIds,
+            String startingHashKey,
+            String endingHashKey) {
         checkNotNull(streamArn, "streamArn cannot be null");
         checkNotNull(shardId, "shardId cannot be null");
         checkNotNull(startingPosition, "startingPosition cannot be null");
         checkNotNull(parentShardIds, "parentShardIds cannot be null");
+        checkNotNull(startingHashKey, "startingHashKey cannot be null");
+        checkNotNull(endingHashKey, "endingHashKey cannot be null");
 
         this.streamArn = streamArn;
         this.shardId = shardId;
         this.startingPosition = startingPosition;
         this.parentShardIds = new HashSet<>(parentShardIds);
+        this.startingHashKey = startingHashKey;
+        this.endingHashKey = endingHashKey;
     }
 
     @Override
@@ -80,6 +88,14 @@ public final class KinesisShardSplit implements SourceSplit {
         return parentShardIds;
     }
 
+    public String getStartingHashKey() {
+        return startingHashKey;
+    }
+
+    public String getEndingHashKey() {
+        return endingHashKey;
+    }
+
     @Override
     public String toString() {
         return "KinesisShardSplit{"
@@ -94,6 +110,12 @@ public final class KinesisShardSplit implements SourceSplit 
{
                 + ", parentShardIds=["
                 + String.join(",", parentShardIds)
                 + ']'
+                + ", startingHashKey='"
+                + startingHashKey
+                + '\''
+                + ", endingHashKey='"
+                + endingHashKey
+                + '\''
                 + '}';
     }
 
@@ -109,11 +131,19 @@ public final class KinesisShardSplit implements 
SourceSplit {
         return Objects.equals(streamArn, that.streamArn)
                 && Objects.equals(shardId, that.shardId)
                 && Objects.equals(startingPosition, that.startingPosition)
-                && Objects.equals(parentShardIds, that.parentShardIds);
+                && Objects.equals(parentShardIds, that.parentShardIds)
+                && Objects.equals(startingHashKey, that.startingHashKey)
+                && Objects.equals(endingHashKey, that.endingHashKey);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(streamArn, shardId, startingPosition, 
parentShardIds);
+        return Objects.hash(
+                streamArn,
+                shardId,
+                startingPosition,
+                parentShardIds,
+                startingHashKey,
+                endingHashKey);
     }
 }
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java
index b5b1711..5433a64 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java
@@ -76,6 +76,8 @@ public class KinesisShardSplitSerializer implements 
SimpleVersionedSerializer<Ki
             for (String parentShardId : split.getParentShardIds()) {
                 out.writeUTF(parentShardId);
             }
+            out.writeUTF(split.getStartingHashKey());
+            out.writeUTF(split.getEndingHashKey());
 
             out.flush();
             return baos.toByteArray();
@@ -145,11 +147,23 @@ public class KinesisShardSplitSerializer implements 
SimpleVersionedSerializer<Ki
                 }
             }
 
+            String startingHashKey;
+            String endingHashKey;
+            if (version == CURRENT_VERSION) {
+                startingHashKey = in.readUTF();
+                endingHashKey = in.readUTF();
+            } else {
+                startingHashKey = "-1";
+                endingHashKey = "0";
+            }
+
             return new KinesisShardSplit(
                     streamArn,
                     shardId,
                     new StartingPosition(shardIteratorType, startingMarker),
-                    parentShardIds);
+                    parentShardIds,
+                    startingHashKey,
+                    endingHashKey);
         }
     }
 }
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitState.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitState.java
index 54e3a9b..391aa5e 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitState.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitState.java
@@ -40,7 +40,9 @@ public class KinesisShardSplitState {
                 kinesisShardSplit.getStreamArn(),
                 kinesisShardSplit.getShardId(),
                 nextStartingPosition,
-                kinesisShardSplit.getParentShardIds());
+                kinesisShardSplit.getParentShardIds(),
+                kinesisShardSplit.getStartingHashKey(),
+                kinesisShardSplit.getEndingHashKey());
     }
 
     public String getSplitId() {
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java
index e8e5e89..953dbde 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java
@@ -35,9 +35,11 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.kinesis.model.Shard;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 import java.time.Instant;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -48,6 +50,7 @@ import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSou
 import static 
org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.TestKinesisStreamProxy;
 import static 
org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.getTestStreamProxy;
 import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardsWithEqualHashKeyRange;
 import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
 import static 
org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
 import static 
org.assertj.core.api.AssertionsForClassTypes.assertThatNoException;
@@ -440,18 +443,12 @@ class KinesisStreamsSourceEnumeratorTest {
                             true);
             enumerator.start();
 
-            // Given enumerator is initialised without only one reader
+            // Given enumerator is initialised with only one reader
             final int subtaskId = 1;
             context.registerReader(TestUtil.getTestReaderInfo(subtaskId));
             enumerator.addReader(subtaskId);
-            String[] shardIds =
-                    new String[] {
-                        generateShardId(0),
-                        generateShardId(1),
-                        generateShardId(2),
-                        generateShardId(3)
-                    };
-            streamProxy.addShards(shardIds);
+            Shard[] shards = generateShardsWithEqualHashKeyRange(4);
+            streamProxy.addShards(shards);
 
             // When first discovery runs
             context.runNextOneTimeCallable();
@@ -476,7 +473,8 @@ class KinesisStreamsSourceEnumeratorTest {
                             
initialSplitAssignment.assignment().values().stream()
                                     .flatMap(Collection::stream)
                                     .map(KinesisShardSplit::getShardId))
-                    .containsExactlyInAnyOrder(shardIds);
+                    .containsExactlyInAnyOrder(
+                            
Arrays.stream(shards).map(Shard::shardId).toArray(String[]::new));
             assertThat(
                             
initialSplitAssignment.assignment().values().stream()
                                     .flatMap(Collection::stream)
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssignerTest.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssignerTest.java
index 8ba3473..338b722 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssignerTest.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssignerTest.java
@@ -23,13 +23,17 @@ import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
-import java.util.ArrayList;
+import java.math.BigInteger;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestReaderInfo;
 import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
@@ -38,58 +42,49 @@ import static 
org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOf
 
 class UniformShardAssignerTest {
 
-    @Test
-    void testAssignedToLeastBusySubtask() {
-        final KinesisShardSplit split = getTestSplit();
+    @ParameterizedTest
+    @MethodSource("hashKeyRangeDataProvider")
+    void testSplitAssignedByHashKeyRange(
+            BigInteger startingHashKey,
+            BigInteger endingHashKey,
+            int nSubtasks,
+            int expectedSubtask) {
+        // Given a split with a given hash-key range
+        final KinesisShardSplit split = getTestSplit(startingHashKey, 
endingHashKey);
         final TestShardAssignerContext assignerContext = new 
TestShardAssignerContext();
-
         // Given a distribution of subtasks with varying busyness
-        createReaderWithAssignedSplits(assignerContext, 1, 3);
-        createReaderWithAssignedSplits(assignerContext, 2, 2);
-        createReaderWithAssignedSplits(assignerContext, 3, 1);
+        for (int i = 0; i < nSubtasks; i++) {
+            createReaderWithAssignedSplits(assignerContext, i, nSubtasks - i);
+        }
 
         // When assigned a subtask
         KinesisShardAssigner assigner = new UniformShardAssigner();
 
-        // Then least busy subtask is chosen
-        assertThat(assigner.assign(split, assignerContext)).isEqualTo(3);
+        // Then expected subtask is assigned
+        assertThat(assigner.assign(split, 
assignerContext)).isEqualTo(expectedSubtask);
     }
 
-    @Test
-    void testAssignedToLeastBusySubtaskConsideringPendingAssignments() {
-        final KinesisShardSplit split = getTestSplit();
-        final TestShardAssignerContext assignerContext = new 
TestShardAssignerContext();
-
-        // Given a distribution of subtasks with same busyness
-        createReaderWithAssignedSplits(assignerContext, 1, 1);
-        createReaderWithAssignedSplits(assignerContext, 2, 1);
-        createReaderWithAssignedSplits(assignerContext, 3, 1);
-        // Given a pending distribution of subtasks with varying busyness
-        addPendingSplits(assignerContext, 1, 3);
-        addPendingSplits(assignerContext, 2, 1);
-        addPendingSplits(assignerContext, 3, 3);
-
-        // When assigned a subtask
-        KinesisShardAssigner assigner = new UniformShardAssigner();
-
-        // Then least busy subtask is chosen
-        assertThat(assigner.assign(split, assignerContext)).isEqualTo(2);
-    }
-
-    @Test
-    void testOnlyRegisteredReaders() {
-        final KinesisShardSplit split = getTestSplit();
-        final TestShardAssignerContext assignerContext = new 
TestShardAssignerContext();
-
-        // Given a few registered readers
-        assignerContext.registeredReaders.put(1, getTestReaderInfo(1));
-        assignerContext.registeredReaders.put(2, getTestReaderInfo(2));
-        assignerContext.registeredReaders.put(3, getTestReaderInfo(3));
-
-        // When assigned a subtask
-        KinesisShardAssigner assigner = new UniformShardAssigner();
-        // Then Exception is thrown
-        assertThat(assigner.assign(split, assignerContext)).isIn(1, 2, 3);
+    static Stream<Arguments> hashKeyRangeDataProvider() {
+        BigInteger two = BigInteger.valueOf(2);
+        BigInteger three = BigInteger.valueOf(3);
+        // Split the hash key range into thirds
+        BigInteger maxHashKey = two.pow(128).subtract(BigInteger.ONE);
+        BigInteger[] rangeBoundaries = {
+            BigInteger.ZERO,
+            maxHashKey.divide(three),
+            maxHashKey.divide(three).multiply(two),
+            maxHashKey
+        };
+        return Stream.of(
+                // Test that each shard will be distributed evenly
+                Arguments.of(rangeBoundaries[0], rangeBoundaries[1], 3, 0),
+                Arguments.of(rangeBoundaries[1], rangeBoundaries[2], 3, 1),
+                Arguments.of(rangeBoundaries[2], rangeBoundaries[3], 3, 2),
+                // Edge case: Hash key range from 0 to 0
+                Arguments.of(BigInteger.ZERO, BigInteger.ZERO, 3, 0),
+                // Edge case: Negative hash key range should be treated as 
positive.
+                // This is possible if restoring from old state
+                Arguments.of(BigInteger.ONE.negate(), BigInteger.ZERO, 3, 0));
     }
 
     @Test
@@ -126,21 +121,6 @@ class UniformShardAssignerTest {
         }
     }
 
-    private void addPendingSplits(
-            TestShardAssignerContext testShardAssignerContext,
-            int subtaskId,
-            int numAssignedSplits) {
-        if 
(!testShardAssignerContext.pendingSplitAssignments.containsKey(subtaskId)) {
-            testShardAssignerContext.pendingSplitAssignments.put(subtaskId, 
new ArrayList<>());
-        }
-        for (int i = 0; i < numAssignedSplits; i++) {
-            testShardAssignerContext
-                    .pendingSplitAssignments
-                    .get(subtaskId)
-                    .add(getTestSplit(String.valueOf(i)));
-        }
-    }
-
     private static class TestShardAssignerContext implements 
KinesisShardAssigner.Context {
         private final Map<Integer, Set<KinesisShardSplit>> splitAssignment = 
new HashMap<>();
         private final Map<Integer, List<KinesisShardSplit>> 
pendingSplitAssignments =
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java
index 6eb1e23..c4bca20 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java
@@ -34,6 +34,7 @@ import 
software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
 import software.amazon.awssdk.services.kinesis.model.KinesisRequest;
 import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
 import software.amazon.awssdk.services.kinesis.model.Record;
@@ -51,6 +52,8 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.ENDING_HASH_KEY_TEST_VALUE;
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.STARTING_HASH_KEY_TEST_VALUE;
 import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 import static 
org.assertj.core.api.AssertionsForClassTypes.assertThatNoException;
@@ -413,7 +416,15 @@ class KinesisStreamProxyTest {
     private List<Shard> getTestShards(final int startShardId, final int 
endShardId) {
         List<Shard> shards = new ArrayList<>();
         for (int i = startShardId; i <= endShardId; i++) {
-            shards.add(Shard.builder().shardId(generateShardId(i)).build());
+            shards.add(
+                    Shard.builder()
+                            .shardId(generateShardId(i))
+                            .hashKeyRange(
+                                    HashKeyRange.builder()
+                                            
.startingHashKey(STARTING_HASH_KEY_TEST_VALUE)
+                                            
.endingHashKey(ENDING_HASH_KEY_TEST_VALUE)
+                                            .build())
+                            .build());
         }
         return shards;
     }
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java
index 72692d7..9b0662b 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.math.BigInteger;
 import java.time.Instant;
 import java.util.Arrays;
 import java.util.Collections;
@@ -32,6 +33,8 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.ENDING_HASH_KEY_TEST_VALUE;
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.STARTING_HASH_KEY_TEST_VALUE;
 import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.STREAM_ARN;
 import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
 import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
@@ -74,7 +77,9 @@ class KinesisShardSplitSerializerTest {
                         STREAM_ARN,
                         generateShardId(10),
                         
StartingPosition.continueFromSequenceNumber("some-sequence-number"),
-                        new HashSet<>(Arrays.asList(generateShardId(2), 
generateShardId(5))));
+                        new HashSet<>(Arrays.asList(generateShardId(2), 
generateShardId(5))),
+                        STARTING_HASH_KEY_TEST_VALUE,
+                        ENDING_HASH_KEY_TEST_VALUE);
 
         byte[] oldSerializedState = serializer.serializeV0(initialSplit);
         KinesisShardSplit deserializedSplit = serializer.deserialize(0, 
oldSerializedState);
@@ -82,10 +87,14 @@ class KinesisShardSplitSerializerTest {
         assertThat(deserializedSplit)
                 .usingRecursiveComparison(
                         RecursiveComparisonConfiguration.builder()
-                                .withIgnoredFields("parentShardIds")
+                                .withIgnoredFields(
+                                        "parentShardIds", "startingHashKey", 
"endingHashKey")
                                 .build())
                 .isEqualTo(initialSplit);
         
assertThat(deserializedSplit.getParentShardIds()).isNotNull().matches(Set::isEmpty);
+        assertThat(deserializedSplit.getStartingHashKey())
+                .isEqualTo(BigInteger.ONE.negate().toString());
+        
assertThat(deserializedSplit.getEndingHashKey()).isEqualTo(BigInteger.ZERO.toString());
     }
 
     @Test
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitTest.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitTest.java
index baefeb4..d00ea18 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitTest.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitTest.java
@@ -24,6 +24,8 @@ import org.junit.jupiter.api.Test;
 import java.util.Collections;
 import java.util.Set;
 
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.ENDING_HASH_KEY_TEST_VALUE;
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.STARTING_HASH_KEY_TEST_VALUE;
 import static 
org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
 
 class KinesisShardSplitTest {
@@ -40,7 +42,12 @@ class KinesisShardSplitTest {
                 .isThrownBy(
                         () ->
                                 new KinesisShardSplit(
-                                        null, SHARD_ID, STARTING_POSITION, 
PARENT_SHARD_IDS))
+                                        null,
+                                        SHARD_ID,
+                                        STARTING_POSITION,
+                                        PARENT_SHARD_IDS,
+                                        STARTING_HASH_KEY_TEST_VALUE,
+                                        ENDING_HASH_KEY_TEST_VALUE))
                 .withMessageContaining("streamArn cannot be null");
     }
 
@@ -50,7 +57,12 @@ class KinesisShardSplitTest {
                 .isThrownBy(
                         () ->
                                 new KinesisShardSplit(
-                                        STREAM_ARN, null, STARTING_POSITION, 
PARENT_SHARD_IDS))
+                                        STREAM_ARN,
+                                        null,
+                                        STARTING_POSITION,
+                                        PARENT_SHARD_IDS,
+                                        STARTING_HASH_KEY_TEST_VALUE,
+                                        ENDING_HASH_KEY_TEST_VALUE))
                 .withMessageContaining("shardId cannot be null");
     }
 
@@ -58,7 +70,14 @@ class KinesisShardSplitTest {
     void testStartingPositionNull() {
         assertThatExceptionOfType(NullPointerException.class)
                 .isThrownBy(
-                        () -> new KinesisShardSplit(STREAM_ARN, SHARD_ID, 
null, PARENT_SHARD_IDS))
+                        () ->
+                                new KinesisShardSplit(
+                                        STREAM_ARN,
+                                        SHARD_ID,
+                                        null,
+                                        PARENT_SHARD_IDS,
+                                        STARTING_HASH_KEY_TEST_VALUE,
+                                        ENDING_HASH_KEY_TEST_VALUE))
                 .withMessageContaining("startingPosition cannot be null");
     }
 
@@ -66,10 +85,47 @@ class KinesisShardSplitTest {
     void testParentShardIdsNull() {
         assertThatExceptionOfType(NullPointerException.class)
                 .isThrownBy(
-                        () -> new KinesisShardSplit(STREAM_ARN, SHARD_ID, 
STARTING_POSITION, null))
+                        () ->
+                                new KinesisShardSplit(
+                                        STREAM_ARN,
+                                        SHARD_ID,
+                                        STARTING_POSITION,
+                                        null,
+                                        STARTING_HASH_KEY_TEST_VALUE,
+                                        ENDING_HASH_KEY_TEST_VALUE))
                 .withMessageContaining("parentShardIds cannot be null");
     }
 
+    @Test
+    void testStartingHashKeyNull() {
+        assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(
+                        () ->
+                                new KinesisShardSplit(
+                                        STREAM_ARN,
+                                        SHARD_ID,
+                                        STARTING_POSITION,
+                                        PARENT_SHARD_IDS,
+                                        null,
+                                        ENDING_HASH_KEY_TEST_VALUE))
+                .withMessageContaining("startingHashKey cannot be null");
+    }
+
+    @Test
+    void testEndingHashKeyNull() {
+        assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(
+                        () ->
+                                new KinesisShardSplit(
+                                        STREAM_ARN,
+                                        SHARD_ID,
+                                        STARTING_POSITION,
+                                        PARENT_SHARD_IDS,
+                                        STARTING_HASH_KEY_TEST_VALUE,
+                                        null))
+                .withMessageContaining("endingHashKey cannot be null");
+    }
+
     @Test
     void testEquals() {
         EqualsVerifier.forClass(KinesisShardSplit.class).verify();
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
index 99ea2ad..ab12164 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.connector.kinesis.source.split.StartingPosition;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
 import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 import software.amazon.awssdk.services.kinesis.model.ShardFilter;
@@ -36,6 +37,7 @@ import java.time.Instant;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.List;
@@ -44,6 +46,9 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.ENDING_HASH_KEY_TEST_VALUE;
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.STARTING_HASH_KEY_TEST_VALUE;
+
 /** Provides {@link StreamProxy} with mocked Kinesis Streams behavior. */
 public class KinesisStreamProxyProvider {
 
@@ -146,10 +151,22 @@ public class KinesisStreamProxyProvider {
 
         public void addShards(String... shardIds) {
             for (String shardId : shardIds) {
-                shards.add(Shard.builder().shardId(shardId).build());
+                shards.add(
+                        Shard.builder()
+                                .shardId(shardId)
+                                .hashKeyRange(
+                                        HashKeyRange.builder()
+                                                
.startingHashKey(STARTING_HASH_KEY_TEST_VALUE)
+                                                
.endingHashKey(ENDING_HASH_KEY_TEST_VALUE)
+                                                .build())
+                                .build());
             }
         }
 
+        public void addShards(Shard... shards) {
+            Collections.addAll(this.shards, shards);
+        }
+
         public void setListShardsExceptionSupplier(Supplier<Exception> 
exceptionSupplier) {
             listShardsExceptionSupplier = exceptionSupplier;
         }
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java
index 362b20d..3bd58d3 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java
@@ -29,12 +29,16 @@ import org.apache.flink.metrics.testutils.MetricListener;
 
 import software.amazon.awssdk.arns.Arn;
 import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
 import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.Shard;
 
+import java.math.BigInteger;
 import java.time.Instant;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.IntStream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -46,11 +50,42 @@ public class TestUtil {
     public static final String SHARD_ID = "shardId-000000000002";
     public static final SimpleStringSchema STRING_SCHEMA = new 
SimpleStringSchema();
     public static final long MILLIS_BEHIND_LATEST_TEST_VALUE = 100L;
+    public static final String STARTING_HASH_KEY_TEST_VALUE =
+            "42535295865117307932921825928971026432";
+    public static final String ENDING_HASH_KEY_TEST_VALUE =
+            "85070591730234615865843651857942052863";
 
     public static String generateShardId(int shardId) {
         return String.format("shardId-%012d", shardId);
     }
 
+    public static Shard[] generateShardsWithEqualHashKeyRange(int 
numberOfShards) {
+        BigInteger two = BigInteger.valueOf(2);
+        BigInteger numShards = BigInteger.valueOf(numberOfShards);
+        BigInteger maxHashKey = two.pow(128).subtract(BigInteger.ONE);
+        BigInteger[] hashKeyRangeBoundaries = new BigInteger[numberOfShards + 
1];
+        for (int i = 0; i <= numberOfShards; i++) {
+            hashKeyRangeBoundaries[i] =
+                    
maxHashKey.multiply(BigInteger.valueOf(i)).divide(numShards);
+        }
+        return IntStream.range(0, numberOfShards)
+                .mapToObj(
+                        i ->
+                                Shard.builder()
+                                        .shardId(generateShardId(i))
+                                        .hashKeyRange(
+                                                HashKeyRange.builder()
+                                                        .startingHashKey(
+                                                                
hashKeyRangeBoundaries[i]
+                                                                        
.toString())
+                                                        .endingHashKey(
+                                                                
hashKeyRangeBoundaries[i + 1]
+                                                                        
.toString())
+                                                        .build())
+                                        .build())
+                .toArray(Shard[]::new);
+    }
+
     public static KinesisShardSplitState getTestSplitState() {
         return new KinesisShardSplitState(getTestSplit());
     }
@@ -73,18 +108,44 @@ public class TestUtil {
 
     public static KinesisShardSplit getTestSplit(String streamArn, String 
shardId) {
         return new KinesisShardSplit(
-                streamArn, shardId, StartingPosition.fromStart(), 
Collections.emptySet());
+                streamArn,
+                shardId,
+                StartingPosition.fromStart(),
+                Collections.emptySet(),
+                STARTING_HASH_KEY_TEST_VALUE,
+                ENDING_HASH_KEY_TEST_VALUE);
     }
 
     public static KinesisShardSplit getTestSplit(
             String streamArn, String shardId, Set<String> parentShards) {
         return new KinesisShardSplit(
-                streamArn, shardId, StartingPosition.fromStart(), 
parentShards);
+                streamArn,
+                shardId,
+                StartingPosition.fromStart(),
+                parentShards,
+                STARTING_HASH_KEY_TEST_VALUE,
+                ENDING_HASH_KEY_TEST_VALUE);
     }
 
     public static KinesisShardSplit getTestSplit(StartingPosition 
startingPosition) {
         return new KinesisShardSplit(
-                STREAM_ARN, SHARD_ID, startingPosition, 
Collections.emptySet());
+                STREAM_ARN,
+                SHARD_ID,
+                startingPosition,
+                Collections.emptySet(),
+                STARTING_HASH_KEY_TEST_VALUE,
+                ENDING_HASH_KEY_TEST_VALUE);
+    }
+
+    public static KinesisShardSplit getTestSplit(
+            BigInteger startingHashKey, BigInteger endingHashKey) {
+        return new KinesisShardSplit(
+                STREAM_ARN,
+                SHARD_ID,
+                StartingPosition.fromStart(),
+                Collections.emptySet(),
+                startingHashKey.toString(),
+                endingHashKey.toString());
     }
 
     public static ReaderInfo getTestReaderInfo(final int subtaskId) {


Reply via email to