Savonitar commented on code in PR #192:
URL:
https://github.com/apache/flink-connector-kafka/pull/192#discussion_r2392448704
##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializerTest.java:
##########
@@ -56,73 +58,108 @@ public void testEnumStateSerde() throws IOException {
final KafkaSourceEnumState restoredState =
serializer.deserialize(serializer.getVersion(), bytes);
-
assertThat(restoredState.assignedPartitions()).isEqualTo(state.assignedPartitions());
- assertThat(restoredState.unassignedInitialPartitions())
- .isEqualTo(state.unassignedInitialPartitions());
+ assertThat(restoredState.assignedSplits())
+ .containsExactlyInAnyOrderElementsOf(state.assignedSplits());
+ assertThat(restoredState.unassignedSplits())
+ .containsExactlyInAnyOrderElementsOf(state.unassignedSplits());
assertThat(restoredState.initialDiscoveryFinished()).isTrue();
}
@Test
public void testBackwardCompatibility() throws IOException {
- final Set<TopicPartition> topicPartitions =
constructTopicPartitions(0);
- final Map<Integer, Set<KafkaPartitionSplit>> splitAssignments =
- toSplitAssignments(topicPartitions);
+ final Set<KafkaPartitionSplit> splits = constructTopicSplits(0);
+ final Map<Integer, Collection<KafkaPartitionSplit>> splitAssignments =
+ toSplitAssignments(splits);
+ final List<SplitAndAssignmentStatus> splitAndAssignmentStatuses =
+ splits.stream()
+ .map(
+ split ->
+ new SplitAndAssignmentStatus(
+ split,
getAssignmentStatus(split)))
+ .collect(Collectors.toList());
// Create bytes in the way of KafkaEnumStateSerializer version 0 doing
serialization
final byte[] bytesV0 =
SerdeUtils.serializeSplitAssignments(
splitAssignments, new KafkaPartitionSplitSerializer());
// Create bytes in the way of KafkaEnumStateSerializer version 1 doing
serialization
- final byte[] bytesV1 =
-
KafkaSourceEnumStateSerializer.serializeTopicPartitions(topicPartitions);
+ final byte[] bytesV1 =
KafkaSourceEnumStateSerializer.serializeV1(splits);
+ final byte[] bytesV2 =
+
KafkaSourceEnumStateSerializer.serializeV2(splitAndAssignmentStatuses, false);
// Deserialize above bytes with KafkaEnumStateSerializer version 2 to
check backward
// compatibility
final KafkaSourceEnumState kafkaSourceEnumStateV0 =
new KafkaSourceEnumStateSerializer().deserialize(0, bytesV0);
final KafkaSourceEnumState kafkaSourceEnumStateV1 =
new KafkaSourceEnumStateSerializer().deserialize(1, bytesV1);
+ final KafkaSourceEnumState kafkaSourceEnumStateV2 =
+ new KafkaSourceEnumStateSerializer().deserialize(2, bytesV2);
-
assertThat(kafkaSourceEnumStateV0.assignedPartitions()).isEqualTo(topicPartitions);
-
assertThat(kafkaSourceEnumStateV0.unassignedInitialPartitions()).isEmpty();
+ assertThat(kafkaSourceEnumStateV0.assignedSplits())
+ .containsExactlyInAnyOrderElementsOf(splits);
+ assertThat(kafkaSourceEnumStateV0.unassignedSplits()).isEmpty();
assertThat(kafkaSourceEnumStateV0.initialDiscoveryFinished()).isTrue();
-
assertThat(kafkaSourceEnumStateV1.assignedPartitions()).isEqualTo(topicPartitions);
-
assertThat(kafkaSourceEnumStateV1.unassignedInitialPartitions()).isEmpty();
+ assertThat(kafkaSourceEnumStateV1.assignedSplits())
+ .containsExactlyInAnyOrderElementsOf(splits);
+ assertThat(kafkaSourceEnumStateV1.unassignedSplits()).isEmpty();
assertThat(kafkaSourceEnumStateV1.initialDiscoveryFinished()).isTrue();
+
+ final Map<AssignmentStatus, Set<KafkaPartitionSplit>> splitsByStatus =
+ splitAndAssignmentStatuses.stream()
+ .collect(
+ Collectors.groupingBy(
+
SplitAndAssignmentStatus::assignmentStatus,
+ Collectors.mapping(
+
SplitAndAssignmentStatus::split,
+ Collectors.toSet())));
+ assertThat(kafkaSourceEnumStateV2.assignedSplits())
+
.containsExactlyInAnyOrderElementsOf(splitsByStatus.get(AssignmentStatus.ASSIGNED));
+ assertThat(kafkaSourceEnumStateV2.unassignedSplits())
+ .containsExactlyInAnyOrderElementsOf(
+ splitsByStatus.get(AssignmentStatus.UNASSIGNED));
+
assertThat(kafkaSourceEnumStateV2.initialDiscoveryFinished()).isFalse();
+ }
+
+ private static AssignmentStatus getAssignmentStatus(KafkaPartitionSplit
split) {
+ return AssignmentStatus.values()[
+ Math.abs(split.hashCode()) % AssignmentStatus.values().length];
}
- private Set<TopicPartition> constructTopicPartitions(int startPartition) {
+ private Set<KafkaPartitionSplit> constructTopicSplits(int startPartition) {
// Create topic partitions for readers.
// Reader i will be assigned with NUM_PARTITIONS_PER_TOPIC splits,
with topic name
// "topic-{i}" and
// NUM_PARTITIONS_PER_TOPIC partitions. The starting partition number
is startPartition
// Totally NUM_READERS * NUM_PARTITIONS_PER_TOPIC partitions will be
created.
- Set<TopicPartition> topicPartitions = new HashSet<>();
+ Set<KafkaPartitionSplit> topicPartitions = new HashSet<>();
for (int readerId = 0; readerId < NUM_READERS; readerId++) {
for (int partition = startPartition;
partition < startPartition + NUM_PARTITIONS_PER_TOPIC;
partition++) {
- topicPartitions.add(new TopicPartition(TOPIC_PREFIX +
readerId, partition));
+ topicPartitions.add(
+ new KafkaPartitionSplit(
+ new TopicPartition(TOPIC_PREFIX + readerId,
partition),
+ STARTING_OFFSET));
Review Comment:
Thanks for the PR. This is a very good improvement for the connector.
I noticed that the current test creates splits using the constant
`KafkaPartitionSplit.EARLIEST_OFFSET`, would it make sense to add a test case
that uses a real-world offset (e.g., `123`)?
##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java:
##########
@@ -316,14 +329,20 @@ public void testDiscoverPartitionsPeriodically() throws
Throwable {
}
}
- @Test
- public void testAddSplitsBack() throws Throwable {
+ @ParameterizedTest
+ @EnumSource(StandardOffsetsInitializer.class)
+ public void testAddSplitsBack(StandardOffsetsInitializer
offsetsInitializer) throws Throwable {
Review Comment:
Is my understanding correct that the test verifies that the offset is
correctly recalculated on recovery, but doesn't verify that the original
offset(before the failure) was preserved and restored?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]