mumrah commented on code in PR #13802:
URL: https://github.com/apache/kafka/pull/13802#discussion_r1228123874


##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##########
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> 
interests, TopicVisitor
             (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
         writer.handleSnapshot(image, consumer);
         assertEquals(1, opCounts.remove("CreateTopic"));
-        assertEquals(1, opCounts.remove("UpdatePartition"));
+        assertEquals(1, opCounts.remove("UpdatePartitions"));
         assertEquals(1, opCounts.remove("UpdateTopic"));
         assertEquals(0, opCounts.size());
         assertEquals("bar", topicClient.createdTopics.get(0));
     }
+
+    @Test
+    public void testDeleteTopicFromSnapshot() {
+        CapturingTopicMigrationClient topicClient = new 
CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, 
TopicVisitor visitor) {
+                visitor.visitTopic("spam", Uuid.randomUuid(), 
Collections.emptyMap());
+            }
+        };
+        CapturingMigrationClient migrationClient = 
CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .build();
+
+        KRaftMigrationZkWriter writer = new 
KRaftMigrationZkWriter(migrationClient);
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), 
topicClient.deletedTopics);
+
+        opCounts.clear();
+        topicClient.reset();
+        writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(2, opCounts.remove("CreateTopic"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), 
topicClient.deletedTopics);
+        assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+    }
+
+    @FunctionalInterface
+    interface TopicVerifier {
+        void verify(Uuid topicId, TopicsImage topicsImage, 
CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+    }
+
+    void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+        // Set up a topic with two partitions in ZK (via iterateTopics) and a 
KRaft TopicsImage, then run the given verifier
+        Uuid topicId = Uuid.randomUuid();
+        Map<Integer, PartitionRegistration> partitionMap = new HashMap<>();
+        partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new 
int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, 
-1));
+        partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new 
int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, 
-1));
+
+        CapturingTopicMigrationClient topicClient = new 
CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, 
TopicVisitor visitor) {
+                Map<Integer, List<Integer>> assignments = new HashMap<>();
+                assignments.put(0, Arrays.asList(2, 3, 4));
+                assignments.put(1, Arrays.asList(3, 4, 5));
+                visitor.visitTopic("spam", topicId, assignments);
+                visitor.visitPartition(new TopicIdPartition(topicId, new 
TopicPartition("spam", 0)), partitionMap.get(0));
+                visitor.visitPartition(new TopicIdPartition(topicId, new 
TopicPartition("spam", 1)), partitionMap.get(1));
+            }
+        };
+
+        CapturingMigrationClient migrationClient = 
CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .build();
+        KRaftMigrationZkWriter writer = new 
KRaftMigrationZkWriter(migrationClient);
+
+        TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+        delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+        delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 
0).message());
+        delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 
1).message());
+        TopicsImage image = delta.apply();
+
+        verifier.verify(topicId, image, topicClient, writer);
+    }
+
+    @Test
+    public void testUpdatePartitionsFromSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, 
writer) -> {
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(0, opCounts.size(), "No operations expected since the 
data is the same");
+
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new 
PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2,
 3)));
+            topicsDelta.replay(new 
PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(3,
 4, 5)).setLeader(3));
+            topicsImage = topicsDelta.apply();
+
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testTopicReassignmentDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, 
writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new 
PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2,
 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, 
consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(1, 
topicClient.updatedTopicPartitions.get("spam").size());
+            assertEquals(Collections.singleton(0), 
topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testNewTopicSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, 
writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new 
TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new 
PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0,
 1, 2)));
+            topicsDelta.replay(new 
PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1,
 2, 3)));
+            topicsDelta.replay(new 
PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2,
 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, 
writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new 
TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new 
PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0,
 1, 2)));
+            topicsDelta.replay(new 
PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1,
 2, 3)));
+            topicsDelta.replay(new 
PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2,
 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, 
consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, 
writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new 
PartitionRecord().setTopicId(topicId).setPartitionId(2).setReplicas(Arrays.asList(1,
 2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, 
consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, 
writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new 
PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setReplicas(Arrays.asList(3,
 4, 5)).setLeader(3));
+            topicsDelta.replay(new 
PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(1,
 2, 3)).setLeader(1));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, 
consumer);
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(2, topicClient.updatedTopics.get("spam").size());
+            assertEquals(new HashSet<>(Arrays.asList(0, 1)), 
topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testDeleteTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, 
writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new RemoveTopicRecord().setTopicId(topicId));
+            TopicsImage newTopicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+            Map<Uuid, String> emptyTopicNames = Collections.emptyMap();
+            assertThrows(RuntimeException.class,
+                () -> writer.handleTopicsDelta(emptyTopicNames::get, 
newTopicsImage, topicsDelta, consumer));
+
+            Map<Uuid, String> topicNames = Collections.singletonMap(topicId, 
"spam");
+            writer.handleTopicsDelta(topicNames::get, newTopicsImage, 
topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("DeleteTopic"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(Collections.singletonList("spam"), 
topicClient.deletedTopics);
+        });
+    }
+
+    @Test
+    public void testBrokerConfigDelta() {
+        CapturingConfigMigrationClient configClient = new 
CapturingConfigMigrationClient();
+        CapturingMigrationClient migrationClient = 
CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setConfigMigrationClient(configClient)
+            .build();
+        KRaftMigrationZkWriter writer = new 
KRaftMigrationZkWriter(migrationClient);
+        ConfigurationsDelta delta = new 
ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new 
ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("foo").setValue("bar"));
+        delta.replay(new 
ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("spam").setValue(null));
+        delta.replay(new 
ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-0").setName("foo").setValue("bar"));
+        delta.replay(new 
ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-1").setName("foo").setValue(null));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsDelta(image, delta, consumer);
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new 
ConfigResource(ConfigResource.Type.BROKER, "b0"))
+        );
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new 
ConfigResource(ConfigResource.Type.TOPIC, "topic-0"))
+        );
+        assertTrue(
+            configClient.deletedResources.contains(new 
ConfigResource(ConfigResource.Type.TOPIC, "topic-1"))
+        );
+    }
+
+    @Test
+    public void testBrokerConfigSnapshot() {
+        CapturingTopicMigrationClient topicClient = new 
CapturingTopicMigrationClient();
+        CapturingConfigMigrationClient configClient = new 
CapturingConfigMigrationClient() {
+            @Override
+            public void iterateBrokerConfigs(BiConsumer<String, Map<String, 
String>> configConsumer) {
+                Map<String, String> b0 = new HashMap<>();
+                b0.put("foo", "bar");
+                b0.put("spam", "eggs");
+                configConsumer.accept("0", b0);
+                configConsumer.accept("1", Collections.singletonMap("foo", 
"bar"));
+                configConsumer.accept("3", Collections.singletonMap("foo", 
"bar"));
+            }
+        };
+        CapturingAclMigrationClient aclClient = new 
CapturingAclMigrationClient();
+        CapturingMigrationClient migrationClient = 
CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .setConfigMigrationClient(configClient)
+                .setAclMigrationClient(aclClient)
+                .build();
+        KRaftMigrationZkWriter writer = new 
KRaftMigrationZkWriter(migrationClient);
+
+        ConfigurationsDelta delta = new 
ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new 
ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("0").setName("foo").setValue("bar"));
+        delta.replay(new 
ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("1").setName("foo").setValue("bar"));
+        delta.replay(new 
ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("2").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsSnapshot(image, consumer);
+
+        assertTrue(configClient.deletedResources.contains(new 
ConfigResource(ConfigResource.Type.BROKER, "3")),
+            "Broker 3 is not in the ConfigurationsImage, it should get 
deleted");
+
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new 
ConfigResource(ConfigResource.Type.BROKER, "0")),
+            "Broker 0 only has foo=bar in image, should overwrite the ZK 
config");
+
+        assertFalse(configClient.writtenConfigs.containsKey(new 
ConfigResource(ConfigResource.Type.BROKER, "1")),
+            "Broker 1 config is the same in image, so no write should happen");
+
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new 
ConfigResource(ConfigResource.Type.BROKER, "2")),
+            "Broker 2 not present in ZK, should see an update");
+
+        assertEquals(2, opCounts.get(UPDATE_BROKER_CONFIG));
+        assertEquals(1, opCounts.get(DELETE_BROKER_CONFIG));
+    }
+
+    @Test
+    public void testTopicConfigSnapshot() {
+        CapturingTopicMigrationClient topicClient = new 
CapturingTopicMigrationClient();
+        CapturingConfigMigrationClient configClient = new 
CapturingConfigMigrationClient() {
+            @Override
+            public void iterateTopicConfigs(BiConsumer<String, Map<String, 
String>> configConsumer) {
+                Map<String, String> topic0 = new HashMap<>();
+                topic0.put("foo", "bar");
+                topic0.put("spam", "eggs");
+                configConsumer.accept("topic-0", topic0);
+                configConsumer.accept("topic-1", 
Collections.singletonMap("foo", "bar"));
+                configConsumer.accept("topic-3", 
Collections.singletonMap("foo", "bar"));
+            }
+        };
+        CapturingAclMigrationClient aclClient = new 
CapturingAclMigrationClient();
+        CapturingMigrationClient migrationClient = 
CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .setConfigMigrationClient(configClient)
+            .setAclMigrationClient(aclClient)
+            .build();
+        KRaftMigrationZkWriter writer = new 
KRaftMigrationZkWriter(migrationClient);
+
+        ConfigurationsDelta delta = new 
ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new 
ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-0").setName("foo").setValue("bar"));
+        delta.replay(new 
ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-1").setName("foo").setValue("bar"));
+        delta.replay(new 
ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-2").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsSnapshot(image, consumer);
+
+        assertTrue(configClient.deletedResources.contains(new 
ConfigResource(ConfigResource.Type.TOPIC, "topic-3")),
+                "Topic topic-3 is not in the ConfigurationsImage, it should 
get deleted");
+
+        assertEquals(
+                Collections.singletonMap("foo", "bar"),
+                configClient.writtenConfigs.get(new 
ConfigResource(ConfigResource.Type.TOPIC, "topic-0")),
+                "Topic topic-0 only has foo=bar in image, should overwrite the 
ZK config");
+
+        assertFalse(configClient.writtenConfigs.containsKey(new 
ConfigResource(ConfigResource.Type.TOPIC, "topic-1")),
+                "Topic topic-1 config is the same in image, so no write should 
happen");
+
+        assertEquals(
+                Collections.singletonMap("foo", "bar"),
+                configClient.writtenConfigs.get(new 
ConfigResource(ConfigResource.Type.TOPIC, "topic-2")),
+                "Topic topic-2 not present in ZK, should see an update");
+
+        assertEquals(2, opCounts.get(UPDATE_TOPIC_CONFIG));
+        assertEquals(1, opCounts.get(DELETE_TOPIC_CONFIG));
+    }
+
+    @Test
+    public void testInvalidConfigSnapshot() {
+        CapturingMigrationClient migrationClient = 
CapturingMigrationClient.newBuilder().build();
+        KRaftMigrationZkWriter writer = new 
KRaftMigrationZkWriter(migrationClient);
+        ConfigurationsDelta delta = new 
ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType((byte) 
99).setResourceName("resource").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+        assertThrows(RuntimeException.class, () -> 
writer.handleConfigsSnapshot(image, consumer),
+            "Should throw due to invalid resource in image");
+    }
+
+    @Test
+    public void testProducerIdSnapshot() {
+        CapturingMigrationClient migrationClient = 
CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .build();
+        KRaftMigrationZkWriter writer = new 
KRaftMigrationZkWriter(migrationClient);
+
+        migrationClient.setReadProducerId(new ProducerIdsBlock(0, 100L, 1000));
+
+        {
+            // No change
+            ProducerIdsImage image = new ProducerIdsImage(1100);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                    (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(0, opCounts.size());
+        }
+
+        {
+            // KRaft differs from ZK

Review Comment:
   We don't actually verify this at this point. Once something is committed to 
KRaft, it's too late for validation. I'll add a case for a KRaft producer ID 
less than the ZK one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to