This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f83a871ceec MINOR: Update `ShareCoordinator` interface to received 
`MetadataImage/Delta` directly (#21029)
f83a871ceec is described below

commit f83a871ceec8b7ed2989b1ec06386a5aa16c0189
Author: David Jacot <[email protected]>
AuthorDate: Tue Dec 2 11:01:11 2025 +0100

    MINOR: Update `ShareCoordinator` interface to received 
`MetadataImage/Delta` directly (#21029)
    
    This patch is a continuation of the work started in
    https://github.com/apache/kafka/pull/21008. It updates the
    `ShareCoordinator` interface to follow the same pattern.
    
    Reviewers: Lianet Magrans <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../server/metadata/BrokerMetadataPublisher.scala  |  3 +-
 .../kafka/coordinator/share/ShareCoordinator.java  | 16 ++++-----
 .../coordinator/share/ShareCoordinatorService.java | 23 +++++++-----
 .../share/ShareCoordinatorServiceTest.java         | 41 +++++++++++-----------
 4 files changed, 42 insertions(+), 41 deletions(-)

diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 3f6abc5c71f..c233d5f45dc 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -26,7 +26,6 @@ import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.internals.Topic
-import 
org.apache.kafka.coordinator.common.runtime.{KRaftCoordinatorMetadataDelta, 
KRaftCoordinatorMetadataImage}
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.coordinator.share.ShareCoordinator
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
@@ -242,7 +241,7 @@ class BrokerMetadataPublisher(
 
       try {
         // Propagate the new image to the share coordinator.
-        shareCoordinator.onNewMetadataImage(new 
KRaftCoordinatorMetadataImage(newImage), newImage.features(), new 
KRaftCoordinatorMetadataDelta(delta))
+        shareCoordinator.onMetadataUpdate(delta, newImage)
       } catch {
         case t: Throwable => metadataPublishingFaultHandler.handleFault("Error 
updating share " +
           s"coordinator with local changes in $deltaName", t)
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
index a5c29ff6807..ee69827d45e 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
@@ -30,9 +30,8 @@ import 
org.apache.kafka.common.message.WriteShareGroupStateRequestData;
 import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
 import org.apache.kafka.common.requests.RequestContext;
 import org.apache.kafka.common.utils.BufferSupplier;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
-import org.apache.kafka.image.FeaturesImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.share.SharePartitionKey;
 
 import java.util.OptionalInt;
@@ -136,12 +135,11 @@ public interface ShareCoordinator {
     /**
      * A new metadata image is available.
      *
-     * @param newImage         The new metadata image.
-     * @param newFeaturesImage The features image.
-     * @param delta            The metadata delta.
+     * @param delta     The metadata delta.
+     * @param newImage  The new metadata image.
      */
-    void onNewMetadataImage(
-        CoordinatorMetadataImage newImage,
-        FeaturesImage newFeaturesImage, CoordinatorMetadataDelta delta
+    void onMetadataUpdate(
+        MetadataDelta delta,
+        MetadataImage newImage
     );
 }
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index 4f00e1cb842..19d198b8de4 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -45,16 +45,17 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorEventProcessor;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetrics;
 import 
org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilderSupplier;
+import 
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataDelta;
+import 
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor;
 import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
 import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
-import org.apache.kafka.image.FeaturesImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.common.ShareVersion;
 import org.apache.kafka.server.record.BrokerCompressionType;
 import org.apache.kafka.server.share.SharePartitionKey;
@@ -69,6 +70,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.OptionalInt;
 import java.util.Properties;
 import java.util.Set;
@@ -82,7 +84,7 @@ import java.util.function.Supplier;
 
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper.handleOperationException;
 
-@SuppressWarnings("ClassDataAbstractionCoupling")
+@SuppressWarnings({"ClassDataAbstractionCoupling", "ClassFanOutComplexity"})
 public class ShareCoordinatorService implements ShareCoordinator {
     private final ShareCoordinatorConfig config;
     private final Logger log;
@@ -1098,10 +1100,13 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
     }
 
     @Override
-    public void onNewMetadataImage(CoordinatorMetadataImage newImage, 
FeaturesImage newFeaturesImage, CoordinatorMetadataDelta delta) {
+    public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) {
         throwIfNotActive();
-        this.runtime.onMetadataUpdate(delta, newImage);
-        boolean enabled = isShareGroupsEnabled(newFeaturesImage);
+        this.runtime.onMetadataUpdate(
+            new KRaftCoordinatorMetadataDelta(Objects.requireNonNull(delta, 
"delta must be provided")),
+            new KRaftCoordinatorMetadataImage(Objects.requireNonNull(newImage, 
"newImage must be provided"))
+        );
+        boolean enabled = isShareGroupsEnabled(newImage);
         // enabled    shouldRunJob         result (XOR)
         // 0            0               no op on flag, do not call jobs
         // 0            1               disable flag, do not call jobs         
             => action
@@ -1129,9 +1134,9 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
         }
     }
 
-    private boolean isShareGroupsEnabled(FeaturesImage image) {
+    private boolean isShareGroupsEnabled(MetadataImage image) {
         return shareGroupConfigEnabledSupplier.get() || 
ShareVersion.fromFeatureLevel(
-            image.finalizedVersions().getOrDefault(ShareVersion.FEATURE_NAME, 
(short) 0)
+            
image.features().finalizedVersions().getOrDefault(ShareVersion.FEATURE_NAME, 
(short) 0)
         ).supportsShareGroups();
     }
 
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 2c3ef63af6b..ea9b8f164f3 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -41,13 +41,12 @@ import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
 import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
 import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
-import org.apache.kafka.image.FeaturesImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.common.ShareVersion;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.util.FutureUtils;
@@ -1491,7 +1490,7 @@ class ShareCoordinatorServiceTest {
         ));
 
         service.startup(() -> 1);
-        service.onNewMetadataImage(mock(CoordinatorMetadataImage.class), 
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1586,7 +1585,7 @@ class ShareCoordinatorServiceTest {
         ));
 
         service.startup(() -> 2);
-        service.onNewMetadataImage(mock(CoordinatorMetadataImage.class), 
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1648,7 +1647,7 @@ class ShareCoordinatorServiceTest {
         ));
 
         service.startup(() -> 1);
-        service.onNewMetadataImage(mock(CoordinatorMetadataImage.class), 
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1701,7 +1700,7 @@ class ShareCoordinatorServiceTest {
         ));
 
         service.startup(() -> 1);
-        service.onNewMetadataImage(mock(CoordinatorMetadataImage.class), 
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1752,7 +1751,7 @@ class ShareCoordinatorServiceTest {
         ));
 
         service.startup(() -> 1);
-        service.onNewMetadataImage(mock(CoordinatorMetadataImage.class), 
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
 
         verify(runtime, times(0))
             .scheduleWriteOperation(
@@ -1816,7 +1815,7 @@ class ShareCoordinatorServiceTest {
         ));
 
         service.startup(() -> 1);
-        service.onNewMetadataImage(mock(CoordinatorMetadataImage.class), 
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1892,7 +1891,7 @@ class ShareCoordinatorServiceTest {
         ));
 
         service.startup(() -> 1);
-        service.onNewMetadataImage(mock(CoordinatorMetadataImage.class), 
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1952,7 +1951,7 @@ class ShareCoordinatorServiceTest {
         )).thenReturn(List.of(CompletableFuture.completedFuture(null)));
 
         service.startup(() -> 1);
-        service.onNewMetadataImage(mock(CoordinatorMetadataImage.class), 
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("snapshot-cold-partitions"),
@@ -2010,7 +2009,7 @@ class ShareCoordinatorServiceTest {
         ));
 
         service.startup(() -> 2);
-        service.onNewMetadataImage(mock(CoordinatorMetadataImage.class), 
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
         verify(runtime, times(0))
             .scheduleWriteAllOperation(
                 eq("snapshot-cold-partitions"),
@@ -2075,11 +2074,11 @@ class ShareCoordinatorServiceTest {
 
         service.startup(() -> 1);
 
-        FeaturesImage mockedFeaturesImage = mock(FeaturesImage.class, 
RETURNS_DEEP_STUBS);
+        MetadataImage mockedImage = mock(MetadataImage.class, 
RETURNS_DEEP_STUBS);
 
         // Feature disabled on start.
-        
when(mockedFeaturesImage.finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
 anyShort())).thenReturn((short) 0);
-        service.onNewMetadataImage(mock(CoordinatorMetadataImage.class), 
mockedFeaturesImage, mock(CoordinatorMetadataDelta.class));   // Jobs will not 
execute as feature is OFF in image.
+        
when(mockedImage.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
 anyShort())).thenReturn((short) 0);
+        service.onMetadataUpdate(mock(MetadataDelta.class), mockedImage);   // 
Jobs will not execute as feature is OFF in image.
 
         verify(timer, times(0)).add(any()); // Timer task not added.
         verify(runtime, times(0)).scheduleWriteOperation(
@@ -2096,9 +2095,9 @@ class ShareCoordinatorServiceTest {
         assertFalse(service.shouldRunPeriodicJob());
 
         // Enable feature.
-        Mockito.reset(mockedFeaturesImage);
-        
when(mockedFeaturesImage.finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
 anyShort())).thenReturn((short) 1);
-        service.onNewMetadataImage(mock(CoordinatorMetadataImage.class), 
mockedFeaturesImage, mock(CoordinatorMetadataDelta.class));   // Jobs will 
execute as feature is ON in image.
+        Mockito.reset(mockedImage);
+        
when(mockedImage.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
 anyShort())).thenReturn((short) 1);
+        service.onMetadataUpdate(mock(MetadataDelta.class), mockedImage);   // 
Jobs will execute as feature is ON in image.
 
         verify(timer, times(2)).add(any()); // Timer task added twice (prune, 
snapshot).
         timer.advanceClock(30001L);
@@ -2116,9 +2115,9 @@ class ShareCoordinatorServiceTest {
         assertTrue(service.shouldRunPeriodicJob());
 
         // Disable feature
-        Mockito.reset(mockedFeaturesImage);
-        
when(mockedFeaturesImage.finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
 anyShort())).thenReturn((short) 0);
-        service.onNewMetadataImage(mock(CoordinatorMetadataImage.class), 
mockedFeaturesImage, mock(CoordinatorMetadataDelta.class));   // Jobs will not 
execute as feature is on in image.
+        Mockito.reset(mockedImage);
+        
when(mockedImage.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
 anyShort())).thenReturn((short) 0);
+        service.onMetadataUpdate(mock(MetadataDelta.class), mockedImage);   // 
Jobs will not execute as feature is on in image.
         timer.advanceClock(30001L);
 
         verify(timer, times(4)).add(any()); // Tasks added but will return 
immediately.

Reply via email to