Savonitar commented on code in PR #192:
URL: 
https://github.com/apache/flink-connector-kafka/pull/192#discussion_r2392448704


##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializerTest.java:
##########
@@ -56,73 +58,108 @@ public void testEnumStateSerde() throws IOException {
         final KafkaSourceEnumState restoredState =
                 serializer.deserialize(serializer.getVersion(), bytes);
 
-        
assertThat(restoredState.assignedPartitions()).isEqualTo(state.assignedPartitions());
-        assertThat(restoredState.unassignedInitialPartitions())
-                .isEqualTo(state.unassignedInitialPartitions());
+        assertThat(restoredState.assignedSplits())
+                .containsExactlyInAnyOrderElementsOf(state.assignedSplits());
+        assertThat(restoredState.unassignedSplits())
+                .containsExactlyInAnyOrderElementsOf(state.unassignedSplits());
         assertThat(restoredState.initialDiscoveryFinished()).isTrue();
     }
 
     @Test
     public void testBackwardCompatibility() throws IOException {
 
-        final Set<TopicPartition> topicPartitions = 
constructTopicPartitions(0);
-        final Map<Integer, Set<KafkaPartitionSplit>> splitAssignments =
-                toSplitAssignments(topicPartitions);
+        final Set<KafkaPartitionSplit> splits = constructTopicSplits(0);
+        final Map<Integer, Collection<KafkaPartitionSplit>> splitAssignments =
+                toSplitAssignments(splits);
+        final List<SplitAndAssignmentStatus> splitAndAssignmentStatuses =
+                splits.stream()
+                        .map(
+                                split ->
+                                        new SplitAndAssignmentStatus(
+                                                split, 
getAssignmentStatus(split)))
+                        .collect(Collectors.toList());
 
         // Create bytes in the way of KafkaEnumStateSerializer version 0 doing 
serialization
         final byte[] bytesV0 =
                 SerdeUtils.serializeSplitAssignments(
                         splitAssignments, new KafkaPartitionSplitSerializer());
         // Create bytes in the way of KafkaEnumStateSerializer version 1 doing 
serialization
-        final byte[] bytesV1 =
-                
KafkaSourceEnumStateSerializer.serializeTopicPartitions(topicPartitions);
+        final byte[] bytesV1 = 
KafkaSourceEnumStateSerializer.serializeV1(splits);
+        final byte[] bytesV2 =
+                
KafkaSourceEnumStateSerializer.serializeV2(splitAndAssignmentStatuses, false);
 
         // Deserialize above bytes with KafkaEnumStateSerializer version 2 to 
check backward
         // compatibility
         final KafkaSourceEnumState kafkaSourceEnumStateV0 =
                 new KafkaSourceEnumStateSerializer().deserialize(0, bytesV0);
         final KafkaSourceEnumState kafkaSourceEnumStateV1 =
                 new KafkaSourceEnumStateSerializer().deserialize(1, bytesV1);
+        final KafkaSourceEnumState kafkaSourceEnumStateV2 =
+                new KafkaSourceEnumStateSerializer().deserialize(2, bytesV2);
 
-        
assertThat(kafkaSourceEnumStateV0.assignedPartitions()).isEqualTo(topicPartitions);
-        
assertThat(kafkaSourceEnumStateV0.unassignedInitialPartitions()).isEmpty();
+        assertThat(kafkaSourceEnumStateV0.assignedSplits())
+                .containsExactlyInAnyOrderElementsOf(splits);
+        assertThat(kafkaSourceEnumStateV0.unassignedSplits()).isEmpty();
         assertThat(kafkaSourceEnumStateV0.initialDiscoveryFinished()).isTrue();
 
-        
assertThat(kafkaSourceEnumStateV1.assignedPartitions()).isEqualTo(topicPartitions);
-        
assertThat(kafkaSourceEnumStateV1.unassignedInitialPartitions()).isEmpty();
+        assertThat(kafkaSourceEnumStateV1.assignedSplits())
+                .containsExactlyInAnyOrderElementsOf(splits);
+        assertThat(kafkaSourceEnumStateV1.unassignedSplits()).isEmpty();
         assertThat(kafkaSourceEnumStateV1.initialDiscoveryFinished()).isTrue();
+
+        final Map<AssignmentStatus, Set<KafkaPartitionSplit>> splitsByStatus =
+                splitAndAssignmentStatuses.stream()
+                        .collect(
+                                Collectors.groupingBy(
+                                        
SplitAndAssignmentStatus::assignmentStatus,
+                                        Collectors.mapping(
+                                                
SplitAndAssignmentStatus::split,
+                                                Collectors.toSet())));
+        assertThat(kafkaSourceEnumStateV2.assignedSplits())
+                
.containsExactlyInAnyOrderElementsOf(splitsByStatus.get(AssignmentStatus.ASSIGNED));
+        assertThat(kafkaSourceEnumStateV2.unassignedSplits())
+                .containsExactlyInAnyOrderElementsOf(
+                        splitsByStatus.get(AssignmentStatus.UNASSIGNED));
+        
assertThat(kafkaSourceEnumStateV2.initialDiscoveryFinished()).isFalse();
+    }
+
+    private static AssignmentStatus getAssignmentStatus(KafkaPartitionSplit 
split) {
+        return AssignmentStatus.values()[
+                Math.abs(split.hashCode()) % AssignmentStatus.values().length];
     }
 
-    private Set<TopicPartition> constructTopicPartitions(int startPartition) {
+    private Set<KafkaPartitionSplit> constructTopicSplits(int startPartition) {
         // Create topic partitions for readers.
         // Reader i will be assigned with NUM_PARTITIONS_PER_TOPIC splits, 
with topic name
         // "topic-{i}" and
         // NUM_PARTITIONS_PER_TOPIC partitions. The starting partition number 
is startPartition
         // Totally NUM_READERS * NUM_PARTITIONS_PER_TOPIC partitions will be 
created.
-        Set<TopicPartition> topicPartitions = new HashSet<>();
+        Set<KafkaPartitionSplit> topicPartitions = new HashSet<>();
         for (int readerId = 0; readerId < NUM_READERS; readerId++) {
             for (int partition = startPartition;
                     partition < startPartition + NUM_PARTITIONS_PER_TOPIC;
                     partition++) {
-                topicPartitions.add(new TopicPartition(TOPIC_PREFIX + 
readerId, partition));
+                topicPartitions.add(
+                        new KafkaPartitionSplit(
+                                new TopicPartition(TOPIC_PREFIX + readerId, 
partition),
+                                STARTING_OFFSET));

Review Comment:
   Thanks for the PR. This is a very good improvement for the connector.
   I noticed that the current test creates splits using the constant 
`KafkaPartitionSplit.EARLIEST_OFFSET`, would it make sense to add a test case 
that uses a real-world offset (e.g., `123`)?



##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java:
##########
@@ -316,14 +329,20 @@ public void testDiscoverPartitionsPeriodically() throws 
Throwable {
         }
     }
 
-    @Test
-    public void testAddSplitsBack() throws Throwable {
+    @ParameterizedTest
+    @EnumSource(StandardOffsetsInitializer.class)
+    public void testAddSplitsBack(StandardOffsetsInitializer 
offsetsInitializer) throws Throwable {

Review Comment:
   Is my understanding correct that the test verifies that the offset is 
correctly recalculated on recovery, but doesn't verify that the original 
offset(before the failure) was preserved and restored?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to