jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248058020
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ########## @@ -825,4 +828,60 @@ public void testClose() throws Exception { assertFutureThrows(write1, NotCoordinatorException.class); assertFutureThrows(write2, NotCoordinatorException.class); } + + @Test + public void testOnNewMetadataImage() { + TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0); + TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1); + + MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); + MockPartitionWriter writer = mock(MockPartitionWriter.class); + MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); + MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); + + CoordinatorRuntime<MockCoordinator, String> runtime = + new CoordinatorRuntime.Builder<MockCoordinator, String>() + .withLoader(loader) + .withEventProcessor(new MockEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorBuilderSupplier(supplier) + .build(); + + MockCoordinator coordinator0 = mock(MockCoordinator.class); + MockCoordinator coordinator1 = mock(MockCoordinator.class); + + when(supplier.get()).thenReturn(builder); + when(builder.withSnapshotRegistry(any())).thenReturn(builder); + when(builder.withLogContext(any())).thenReturn(builder); + when(builder.build()) + .thenReturn(coordinator0) + .thenReturn(coordinator1); + + CompletableFuture<Void> future0 = new CompletableFuture<>(); + when(loader.load(tp0, coordinator0)).thenReturn(future0); + + CompletableFuture<Void> future1 = new CompletableFuture<>(); + when(loader.load(tp1, coordinator1)).thenReturn(future1); + + runtime.scheduleLoadOperation(tp0, 0); + runtime.scheduleLoadOperation(tp1, 0); + + // Coordinator 0 is loaded. It should get the current image + // that is the empty one. + future0.complete(null); + verify(coordinator0).onLoaded(MetadataImage.EMPTY); + + // Publish a new image. + MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); + MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY); + runtime.onNewMetadataImage(newImage, delta); + + // Coordinator 0 should be notified about it. + verify(coordinator0).onNewMetadataImage(newImage, delta); Review Comment: Yeah. I think I understand that the metadata image is updated, but I wasn't sure if we had anything ensuring that the new metadata image will also trigger the refresh of the subscription metadata. (Apologies if this was just in a previous pr) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org