This is an automated email from the ASF dual-hosted git repository.
hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new 41f6710 [FLINK-36187][Connectors/Kinesis] Implement
UniformShardAssigner using Kinesis shard hashes in the KinesisStreamsSource
41f6710 is described below
commit 41f6710e3c2da4ca744712bfc7f58ea68773f8c8
Author: Hong Teoh <[email protected]>
AuthorDate: Wed Sep 4 17:25:55 2024 +0100
[FLINK-36187][Connectors/Kinesis] Implement UniformShardAssigner using
Kinesis shard hashes in the KinesisStreamsSource
---
.../enumerator/KinesisStreamsSourceEnumerator.java | 7 +-
.../enumerator/assigner/UniformShardAssigner.java | 53 ++++++-----
.../kinesis/source/split/KinesisShardSplit.java | 36 +++++++-
.../source/split/KinesisShardSplitSerializer.java | 16 +++-
.../source/split/KinesisShardSplitState.java | 4 +-
.../KinesisStreamsSourceEnumeratorTest.java | 18 ++--
.../assigner/UniformShardAssignerTest.java | 100 +++++++++------------
.../source/proxy/KinesisStreamProxyTest.java | 13 ++-
.../split/KinesisShardSplitSerializerTest.java | 13 ++-
.../source/split/KinesisShardSplitTest.java | 64 ++++++++++++-
.../source/util/KinesisStreamProxyProvider.java | 19 +++-
.../connector/kinesis/source/util/TestUtil.java | 67 +++++++++++++-
12 files changed, 299 insertions(+), 111 deletions(-)
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java
index 741bc5f..1f9f707 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java
@@ -282,7 +282,12 @@ public class KinesisStreamsSourceEnumerator
}
splits.add(
new KinesisShardSplit(
- streamArn, shard.shardId(), startingPosition,
parentShardIds));
+ streamArn,
+ shard.shardId(),
+ startingPosition,
+ parentShardIds,
+ shard.hashKeyRange().startingHashKey(),
+ shard.hashKeyRange().endingHashKey()));
}
return splits;
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssigner.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssigner.java
index 7937ed7..f5d45df 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssigner.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssigner.java
@@ -23,37 +23,42 @@ import
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.util.Preconditions;
-import java.util.Collections;
+import java.math.BigInteger;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-/** An implementation of the {@link KinesisShardAssigner} that assigns splits
uniformly. */
+/**
+ * An implementation of the {@link KinesisShardAssigner} that maps Kinesis
shard hash-key ranges to
+ * Flink subtasks. This shard assigner will ensure that the hash-key range
across all shards are
+ * split across all subtasks (on a shard-level granularity). This is essential
to ensure even
+ * distribution of open shards across subtasks when the Kinesis stream is
scaled.
+ */
@Internal
public class UniformShardAssigner implements KinesisShardAssigner {
- @Override
- public int assign(KinesisShardSplit split, Context context) {
- int selectedSubtask = -1;
- int curMinAssignment = Integer.MAX_VALUE;
- Map<Integer, Set<KinesisShardSplit>> splitAssignment =
context.getCurrentSplitAssignment();
- Map<Integer, List<KinesisShardSplit>> pendingSplitAssignments =
- context.getPendingSplitAssignments();
- for (int subtaskId : context.getRegisteredReaders().keySet()) {
- int subtaskAssignmentSize =
- splitAssignment.getOrDefault(subtaskId,
Collections.emptySet()).size()
- + pendingSplitAssignments
- .getOrDefault(subtaskId,
Collections.emptyList())
- .size();
- if (subtaskAssignmentSize < curMinAssignment) {
- curMinAssignment = subtaskAssignmentSize;
- selectedSubtask = subtaskId;
- }
- }
+ private static final BigInteger TWO = BigInteger.valueOf(2);
+ private static final BigInteger HASH_KEY_BOUND = TWO.pow(128);
+
+ @Override
+ public int assign(KinesisShardSplit split, Context context) {
Preconditions.checkArgument(
- selectedSubtask != -1,
+ !context.getRegisteredReaders().isEmpty(),
"Expected at least one registered reader. Unable to assign
split.");
- return selectedSubtask;
+ BigInteger hashKeyStart = new BigInteger(split.getStartingHashKey());
+ BigInteger hashKeyEnd = new BigInteger(split.getEndingHashKey());
+ BigInteger hashKeyMid = hashKeyStart.add(hashKeyEnd).divide(TWO);
+ // index = hashKeyMid / HASH_KEY_BOUND * nSubtasks + stream-specific
offset
+ // The stream specific offset is added so that different streams will
be less likely to be
+ // distributed in the same way, even if they are sharded in the same
way.
+ // (The caller takes result modulo nSubtasks.)
+ List<Integer> registeredReaders = new
ArrayList<>(context.getRegisteredReaders().keySet());
+ int selectedReaderIndex =
+ Math.abs(
+ hashKeyMid
+
.multiply(BigInteger.valueOf(registeredReaders.size()))
+ .divide(HASH_KEY_BOUND)
+ .intValue());
+ return registeredReaders.get(selectedReaderIndex);
}
}
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java
index 309701d..fb5fef5 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java
@@ -42,21 +42,29 @@ public final class KinesisShardSplit implements SourceSplit
{
private final String shardId;
private final StartingPosition startingPosition;
private final Set<String> parentShardIds;
+ private final String startingHashKey;
+ private final String endingHashKey;
public KinesisShardSplit(
String streamArn,
String shardId,
StartingPosition startingPosition,
- Set<String> parentShardIds) {
+ Set<String> parentShardIds,
+ String startingHashKey,
+ String endingHashKey) {
checkNotNull(streamArn, "streamArn cannot be null");
checkNotNull(shardId, "shardId cannot be null");
checkNotNull(startingPosition, "startingPosition cannot be null");
checkNotNull(parentShardIds, "parentShardIds cannot be null");
+ checkNotNull(startingHashKey, "startingHashKey cannot be null");
+ checkNotNull(endingHashKey, "endingHashKey cannot be null");
this.streamArn = streamArn;
this.shardId = shardId;
this.startingPosition = startingPosition;
this.parentShardIds = new HashSet<>(parentShardIds);
+ this.startingHashKey = startingHashKey;
+ this.endingHashKey = endingHashKey;
}
@Override
@@ -80,6 +88,14 @@ public final class KinesisShardSplit implements SourceSplit {
return parentShardIds;
}
+ public String getStartingHashKey() {
+ return startingHashKey;
+ }
+
+ public String getEndingHashKey() {
+ return endingHashKey;
+ }
+
@Override
public String toString() {
return "KinesisShardSplit{"
@@ -94,6 +110,12 @@ public final class KinesisShardSplit implements SourceSplit
{
+ ", parentShardIds=["
+ String.join(",", parentShardIds)
+ ']'
+ + ", startingHashKey='"
+ + startingHashKey
+ + '\''
+ + ", endingHashKey='"
+ + endingHashKey
+ + '\''
+ '}';
}
@@ -109,11 +131,19 @@ public final class KinesisShardSplit implements
SourceSplit {
return Objects.equals(streamArn, that.streamArn)
&& Objects.equals(shardId, that.shardId)
&& Objects.equals(startingPosition, that.startingPosition)
- && Objects.equals(parentShardIds, that.parentShardIds);
+ && Objects.equals(parentShardIds, that.parentShardIds)
+ && Objects.equals(startingHashKey, that.startingHashKey)
+ && Objects.equals(endingHashKey, that.endingHashKey);
}
@Override
public int hashCode() {
- return Objects.hash(streamArn, shardId, startingPosition,
parentShardIds);
+ return Objects.hash(
+ streamArn,
+ shardId,
+ startingPosition,
+ parentShardIds,
+ startingHashKey,
+ endingHashKey);
}
}
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java
index b5b1711..5433a64 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java
@@ -76,6 +76,8 @@ public class KinesisShardSplitSerializer implements
SimpleVersionedSerializer<Ki
for (String parentShardId : split.getParentShardIds()) {
out.writeUTF(parentShardId);
}
+ out.writeUTF(split.getStartingHashKey());
+ out.writeUTF(split.getEndingHashKey());
out.flush();
return baos.toByteArray();
@@ -145,11 +147,23 @@ public class KinesisShardSplitSerializer implements
SimpleVersionedSerializer<Ki
}
}
+ String startingHashKey;
+ String endingHashKey;
+ if (version == CURRENT_VERSION) {
+ startingHashKey = in.readUTF();
+ endingHashKey = in.readUTF();
+ } else {
+ startingHashKey = "-1";
+ endingHashKey = "0";
+ }
+
return new KinesisShardSplit(
streamArn,
shardId,
new StartingPosition(shardIteratorType, startingMarker),
- parentShardIds);
+ parentShardIds,
+ startingHashKey,
+ endingHashKey);
}
}
}
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitState.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitState.java
index 54e3a9b..391aa5e 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitState.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitState.java
@@ -40,7 +40,9 @@ public class KinesisShardSplitState {
kinesisShardSplit.getStreamArn(),
kinesisShardSplit.getShardId(),
nextStartingPosition,
- kinesisShardSplit.getParentShardIds());
+ kinesisShardSplit.getParentShardIds(),
+ kinesisShardSplit.getStartingHashKey(),
+ kinesisShardSplit.getEndingHashKey());
}
public String getSplitId() {
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java
index e8e5e89..953dbde 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java
@@ -35,9 +35,11 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import java.time.Instant;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -48,6 +50,7 @@ import static
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSou
import static
org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.TestKinesisStreamProxy;
import static
org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.getTestStreamProxy;
import static
org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
+import static
org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardsWithEqualHashKeyRange;
import static
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
import static
org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
import static
org.assertj.core.api.AssertionsForClassTypes.assertThatNoException;
@@ -440,18 +443,12 @@ class KinesisStreamsSourceEnumeratorTest {
true);
enumerator.start();
- // Given enumerator is initialised without only one reader
+ // Given enumerator is initialised with only one reader
final int subtaskId = 1;
context.registerReader(TestUtil.getTestReaderInfo(subtaskId));
enumerator.addReader(subtaskId);
- String[] shardIds =
- new String[] {
- generateShardId(0),
- generateShardId(1),
- generateShardId(2),
- generateShardId(3)
- };
- streamProxy.addShards(shardIds);
+ Shard[] shards = generateShardsWithEqualHashKeyRange(4);
+ streamProxy.addShards(shards);
// When first discovery runs
context.runNextOneTimeCallable();
@@ -476,7 +473,8 @@ class KinesisStreamsSourceEnumeratorTest {
initialSplitAssignment.assignment().values().stream()
.flatMap(Collection::stream)
.map(KinesisShardSplit::getShardId))
- .containsExactlyInAnyOrder(shardIds);
+ .containsExactlyInAnyOrder(
+
Arrays.stream(shards).map(Shard::shardId).toArray(String[]::new));
assertThat(
initialSplitAssignment.assignment().values().stream()
.flatMap(Collection::stream)
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssignerTest.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssignerTest.java
index 8ba3473..338b722 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssignerTest.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssignerTest.java
@@ -23,13 +23,17 @@ import
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
-import java.util.ArrayList;
+import java.math.BigInteger;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Stream;
import static
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestReaderInfo;
import static
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
@@ -38,58 +42,49 @@ import static
org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOf
class UniformShardAssignerTest {
- @Test
- void testAssignedToLeastBusySubtask() {
- final KinesisShardSplit split = getTestSplit();
+ @ParameterizedTest
+ @MethodSource("hashKeyRangeDataProvider")
+ void testSplitAssignedByHashKeyRange(
+ BigInteger startingHashKey,
+ BigInteger endingHashKey,
+ int nSubtasks,
+ int expectedSubtask) {
+ // Given a split with a given hash-key range
+ final KinesisShardSplit split = getTestSplit(startingHashKey,
endingHashKey);
final TestShardAssignerContext assignerContext = new
TestShardAssignerContext();
-
// Given a distribution of subtasks with varying busyness
- createReaderWithAssignedSplits(assignerContext, 1, 3);
- createReaderWithAssignedSplits(assignerContext, 2, 2);
- createReaderWithAssignedSplits(assignerContext, 3, 1);
+ for (int i = 0; i < nSubtasks; i++) {
+ createReaderWithAssignedSplits(assignerContext, i, nSubtasks - i);
+ }
// When assigned a subtask
KinesisShardAssigner assigner = new UniformShardAssigner();
- // Then least busy subtask is chosen
- assertThat(assigner.assign(split, assignerContext)).isEqualTo(3);
+ // Then expected subtask is assigned
+ assertThat(assigner.assign(split,
assignerContext)).isEqualTo(expectedSubtask);
}
- @Test
- void testAssignedToLeastBusySubtaskConsideringPendingAssignments() {
- final KinesisShardSplit split = getTestSplit();
- final TestShardAssignerContext assignerContext = new
TestShardAssignerContext();
-
- // Given a distribution of subtasks with same busyness
- createReaderWithAssignedSplits(assignerContext, 1, 1);
- createReaderWithAssignedSplits(assignerContext, 2, 1);
- createReaderWithAssignedSplits(assignerContext, 3, 1);
- // Given a pending distribution of subtasks with varying busyness
- addPendingSplits(assignerContext, 1, 3);
- addPendingSplits(assignerContext, 2, 1);
- addPendingSplits(assignerContext, 3, 3);
-
- // When assigned a subtask
- KinesisShardAssigner assigner = new UniformShardAssigner();
-
- // Then least busy subtask is chosen
- assertThat(assigner.assign(split, assignerContext)).isEqualTo(2);
- }
-
- @Test
- void testOnlyRegisteredReaders() {
- final KinesisShardSplit split = getTestSplit();
- final TestShardAssignerContext assignerContext = new
TestShardAssignerContext();
-
- // Given a few registered readers
- assignerContext.registeredReaders.put(1, getTestReaderInfo(1));
- assignerContext.registeredReaders.put(2, getTestReaderInfo(2));
- assignerContext.registeredReaders.put(3, getTestReaderInfo(3));
-
- // When assigned a subtask
- KinesisShardAssigner assigner = new UniformShardAssigner();
- // Then Exception is thrown
- assertThat(assigner.assign(split, assignerContext)).isIn(1, 2, 3);
+ static Stream<Arguments> hashKeyRangeDataProvider() {
+ BigInteger two = BigInteger.valueOf(2);
+ BigInteger three = BigInteger.valueOf(3);
+ // Split the hash key range into thirds
+ BigInteger maxHashKey = two.pow(128).subtract(BigInteger.ONE);
+ BigInteger[] rangeBoundaries = {
+ BigInteger.ZERO,
+ maxHashKey.divide(three),
+ maxHashKey.divide(three).multiply(two),
+ maxHashKey
+ };
+ return Stream.of(
+ // Test that each shard will be distributed evenly
+ Arguments.of(rangeBoundaries[0], rangeBoundaries[1], 3, 0),
+ Arguments.of(rangeBoundaries[1], rangeBoundaries[2], 3, 1),
+ Arguments.of(rangeBoundaries[2], rangeBoundaries[3], 3, 2),
+ // Edge case: Hash key range from 0 to 0
+ Arguments.of(BigInteger.ZERO, BigInteger.ZERO, 3, 0),
+ // Edge case: Negative hash key range should be treated as
positive.
+ // This is possible if restoring from old state
+ Arguments.of(BigInteger.ONE.negate(), BigInteger.ZERO, 3, 0));
}
@Test
@@ -126,21 +121,6 @@ class UniformShardAssignerTest {
}
}
- private void addPendingSplits(
- TestShardAssignerContext testShardAssignerContext,
- int subtaskId,
- int numAssignedSplits) {
- if
(!testShardAssignerContext.pendingSplitAssignments.containsKey(subtaskId)) {
- testShardAssignerContext.pendingSplitAssignments.put(subtaskId,
new ArrayList<>());
- }
- for (int i = 0; i < numAssignedSplits; i++) {
- testShardAssignerContext
- .pendingSplitAssignments
- .get(subtaskId)
- .add(getTestSplit(String.valueOf(i)));
- }
- }
-
private static class TestShardAssignerContext implements
KinesisShardAssigner.Context {
private final Map<Integer, Set<KinesisShardSplit>> splitAssignment =
new HashMap<>();
private final Map<Integer, List<KinesisShardSplit>>
pendingSplitAssignments =
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java
index 6eb1e23..c4bca20 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxyTest.java
@@ -34,6 +34,7 @@ import
software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import software.amazon.awssdk.services.kinesis.model.KinesisRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.Record;
@@ -51,6 +52,8 @@ import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.flink.connector.kinesis.source.util.TestUtil.ENDING_HASH_KEY_TEST_VALUE;
+import static
org.apache.flink.connector.kinesis.source.util.TestUtil.STARTING_HASH_KEY_TEST_VALUE;
import static
org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static
org.assertj.core.api.AssertionsForClassTypes.assertThatNoException;
@@ -413,7 +416,15 @@ class KinesisStreamProxyTest {
private List<Shard> getTestShards(final int startShardId, final int
endShardId) {
List<Shard> shards = new ArrayList<>();
for (int i = startShardId; i <= endShardId; i++) {
- shards.add(Shard.builder().shardId(generateShardId(i)).build());
+ shards.add(
+ Shard.builder()
+ .shardId(generateShardId(i))
+ .hashKeyRange(
+ HashKeyRange.builder()
+
.startingHashKey(STARTING_HASH_KEY_TEST_VALUE)
+
.endingHashKey(ENDING_HASH_KEY_TEST_VALUE)
+ .build())
+ .build());
}
return shards;
}
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java
index 72692d7..9b0662b 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
+import java.math.BigInteger;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
@@ -32,6 +33,8 @@ import java.util.HashSet;
import java.util.Set;
import java.util.stream.Stream;
+import static
org.apache.flink.connector.kinesis.source.util.TestUtil.ENDING_HASH_KEY_TEST_VALUE;
+import static
org.apache.flink.connector.kinesis.source.util.TestUtil.STARTING_HASH_KEY_TEST_VALUE;
import static
org.apache.flink.connector.kinesis.source.util.TestUtil.STREAM_ARN;
import static
org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
import static
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
@@ -74,7 +77,9 @@ class KinesisShardSplitSerializerTest {
STREAM_ARN,
generateShardId(10),
StartingPosition.continueFromSequenceNumber("some-sequence-number"),
- new HashSet<>(Arrays.asList(generateShardId(2),
generateShardId(5))));
+ new HashSet<>(Arrays.asList(generateShardId(2),
generateShardId(5))),
+ STARTING_HASH_KEY_TEST_VALUE,
+ ENDING_HASH_KEY_TEST_VALUE);
byte[] oldSerializedState = serializer.serializeV0(initialSplit);
KinesisShardSplit deserializedSplit = serializer.deserialize(0,
oldSerializedState);
@@ -82,10 +87,14 @@ class KinesisShardSplitSerializerTest {
assertThat(deserializedSplit)
.usingRecursiveComparison(
RecursiveComparisonConfiguration.builder()
- .withIgnoredFields("parentShardIds")
+ .withIgnoredFields(
+ "parentShardIds", "startingHashKey",
"endingHashKey")
.build())
.isEqualTo(initialSplit);
assertThat(deserializedSplit.getParentShardIds()).isNotNull().matches(Set::isEmpty);
+ assertThat(deserializedSplit.getStartingHashKey())
+ .isEqualTo(BigInteger.ONE.negate().toString());
+
assertThat(deserializedSplit.getEndingHashKey()).isEqualTo(BigInteger.ZERO.toString());
}
@Test
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitTest.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitTest.java
index baefeb4..d00ea18 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitTest.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitTest.java
@@ -24,6 +24,8 @@ import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.Set;
+import static
org.apache.flink.connector.kinesis.source.util.TestUtil.ENDING_HASH_KEY_TEST_VALUE;
+import static
org.apache.flink.connector.kinesis.source.util.TestUtil.STARTING_HASH_KEY_TEST_VALUE;
import static
org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
class KinesisShardSplitTest {
@@ -40,7 +42,12 @@ class KinesisShardSplitTest {
.isThrownBy(
() ->
new KinesisShardSplit(
- null, SHARD_ID, STARTING_POSITION,
PARENT_SHARD_IDS))
+ null,
+ SHARD_ID,
+ STARTING_POSITION,
+ PARENT_SHARD_IDS,
+ STARTING_HASH_KEY_TEST_VALUE,
+ ENDING_HASH_KEY_TEST_VALUE))
.withMessageContaining("streamArn cannot be null");
}
@@ -50,7 +57,12 @@ class KinesisShardSplitTest {
.isThrownBy(
() ->
new KinesisShardSplit(
- STREAM_ARN, null, STARTING_POSITION,
PARENT_SHARD_IDS))
+ STREAM_ARN,
+ null,
+ STARTING_POSITION,
+ PARENT_SHARD_IDS,
+ STARTING_HASH_KEY_TEST_VALUE,
+ ENDING_HASH_KEY_TEST_VALUE))
.withMessageContaining("shardId cannot be null");
}
@@ -58,7 +70,14 @@ class KinesisShardSplitTest {
void testStartingPositionNull() {
assertThatExceptionOfType(NullPointerException.class)
.isThrownBy(
- () -> new KinesisShardSplit(STREAM_ARN, SHARD_ID,
null, PARENT_SHARD_IDS))
+ () ->
+ new KinesisShardSplit(
+ STREAM_ARN,
+ SHARD_ID,
+ null,
+ PARENT_SHARD_IDS,
+ STARTING_HASH_KEY_TEST_VALUE,
+ ENDING_HASH_KEY_TEST_VALUE))
.withMessageContaining("startingPosition cannot be null");
}
@@ -66,10 +85,47 @@ class KinesisShardSplitTest {
void testParentShardIdsNull() {
assertThatExceptionOfType(NullPointerException.class)
.isThrownBy(
- () -> new KinesisShardSplit(STREAM_ARN, SHARD_ID,
STARTING_POSITION, null))
+ () ->
+ new KinesisShardSplit(
+ STREAM_ARN,
+ SHARD_ID,
+ STARTING_POSITION,
+ null,
+ STARTING_HASH_KEY_TEST_VALUE,
+ ENDING_HASH_KEY_TEST_VALUE))
.withMessageContaining("parentShardIds cannot be null");
}
+ @Test
+ void testStartingHashKeyNull() {
+ assertThatExceptionOfType(NullPointerException.class)
+ .isThrownBy(
+ () ->
+ new KinesisShardSplit(
+ STREAM_ARN,
+ SHARD_ID,
+ STARTING_POSITION,
+ PARENT_SHARD_IDS,
+ null,
+ ENDING_HASH_KEY_TEST_VALUE))
+ .withMessageContaining("startingHashKey cannot be null");
+ }
+
+ @Test
+ void testEndingHashKeyNull() {
+ assertThatExceptionOfType(NullPointerException.class)
+ .isThrownBy(
+ () ->
+ new KinesisShardSplit(
+ STREAM_ARN,
+ SHARD_ID,
+ STARTING_POSITION,
+ PARENT_SHARD_IDS,
+ STARTING_HASH_KEY_TEST_VALUE,
+ null))
+ .withMessageContaining("endingHashKey cannot be null");
+ }
+
@Test
void testEquals() {
EqualsVerifier.forClass(KinesisShardSplit.class).verify();
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
index 99ea2ad..ab12164 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
@@ -25,6 +25,7 @@ import
org.apache.flink.connector.kinesis.source.split.StartingPosition;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
@@ -36,6 +37,7 @@ import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
@@ -44,6 +46,9 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.flink.connector.kinesis.source.util.TestUtil.ENDING_HASH_KEY_TEST_VALUE;
+import static
org.apache.flink.connector.kinesis.source.util.TestUtil.STARTING_HASH_KEY_TEST_VALUE;
+
/** Provides {@link StreamProxy} with mocked Kinesis Streams behavior. */
public class KinesisStreamProxyProvider {
@@ -146,10 +151,22 @@ public class KinesisStreamProxyProvider {
public void addShards(String... shardIds) {
for (String shardId : shardIds) {
- shards.add(Shard.builder().shardId(shardId).build());
+ shards.add(
+ Shard.builder()
+ .shardId(shardId)
+ .hashKeyRange(
+ HashKeyRange.builder()
+
.startingHashKey(STARTING_HASH_KEY_TEST_VALUE)
+
.endingHashKey(ENDING_HASH_KEY_TEST_VALUE)
+ .build())
+ .build());
}
}
+ public void addShards(Shard... shards) {
+ Collections.addAll(this.shards, shards);
+ }
+
public void setListShardsExceptionSupplier(Supplier<Exception>
exceptionSupplier) {
listShardsExceptionSupplier = exceptionSupplier;
}
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java
index 362b20d..3bd58d3 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java
@@ -29,12 +29,16 @@ import org.apache.flink.metrics.testutils.MetricListener;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+import java.math.BigInteger;
import java.time.Instant;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
@@ -46,11 +50,42 @@ public class TestUtil {
public static final String SHARD_ID = "shardId-000000000002";
public static final SimpleStringSchema STRING_SCHEMA = new
SimpleStringSchema();
public static final long MILLIS_BEHIND_LATEST_TEST_VALUE = 100L;
+ public static final String STARTING_HASH_KEY_TEST_VALUE =
+ "42535295865117307932921825928971026432";
+ public static final String ENDING_HASH_KEY_TEST_VALUE =
+ "85070591730234615865843651857942052863";
public static String generateShardId(int shardId) {
return String.format("shardId-%012d", shardId);
}
+ public static Shard[] generateShardsWithEqualHashKeyRange(int
numberOfShards) {
+ BigInteger two = BigInteger.valueOf(2);
+ BigInteger numShards = BigInteger.valueOf(numberOfShards);
+ BigInteger maxHashKey = two.pow(128).subtract(BigInteger.ONE);
+ BigInteger[] hashKeyRangeBoundaries = new BigInteger[numberOfShards +
1];
+ for (int i = 0; i <= numberOfShards; i++) {
+ hashKeyRangeBoundaries[i] =
+
maxHashKey.multiply(BigInteger.valueOf(i)).divide(numShards);
+ }
+ return IntStream.range(0, numberOfShards)
+ .mapToObj(
+ i ->
+ Shard.builder()
+ .shardId(generateShardId(i))
+ .hashKeyRange(
+ HashKeyRange.builder()
+ .startingHashKey(
+
hashKeyRangeBoundaries[i]
+
.toString())
+ .endingHashKey(
+
hashKeyRangeBoundaries[i + 1]
+
.toString())
+ .build())
+ .build())
+ .toArray(Shard[]::new);
+ }
+
public static KinesisShardSplitState getTestSplitState() {
return new KinesisShardSplitState(getTestSplit());
}
@@ -73,18 +108,44 @@ public class TestUtil {
public static KinesisShardSplit getTestSplit(String streamArn, String
shardId) {
return new KinesisShardSplit(
- streamArn, shardId, StartingPosition.fromStart(),
Collections.emptySet());
+ streamArn,
+ shardId,
+ StartingPosition.fromStart(),
+ Collections.emptySet(),
+ STARTING_HASH_KEY_TEST_VALUE,
+ ENDING_HASH_KEY_TEST_VALUE);
}
public static KinesisShardSplit getTestSplit(
String streamArn, String shardId, Set<String> parentShards) {
return new KinesisShardSplit(
- streamArn, shardId, StartingPosition.fromStart(),
parentShards);
+ streamArn,
+ shardId,
+ StartingPosition.fromStart(),
+ parentShards,
+ STARTING_HASH_KEY_TEST_VALUE,
+ ENDING_HASH_KEY_TEST_VALUE);
}
public static KinesisShardSplit getTestSplit(StartingPosition
startingPosition) {
return new KinesisShardSplit(
- STREAM_ARN, SHARD_ID, startingPosition,
Collections.emptySet());
+ STREAM_ARN,
+ SHARD_ID,
+ startingPosition,
+ Collections.emptySet(),
+ STARTING_HASH_KEY_TEST_VALUE,
+ ENDING_HASH_KEY_TEST_VALUE);
+ }
+
+ public static KinesisShardSplit getTestSplit(
+ BigInteger startingHashKey, BigInteger endingHashKey) {
+ return new KinesisShardSplit(
+ STREAM_ARN,
+ SHARD_ID,
+ StartingPosition.fromStart(),
+ Collections.emptySet(),
+ startingHashKey.toString(),
+ endingHashKey.toString());
}
public static ReaderInfo getTestReaderInfo(final int subtaskId) {