This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f83a871ceec MINOR: Update `ShareCoordinator` interface to received
`MetadataImage/Delta` directly (#21029)
f83a871ceec is described below
commit f83a871ceec8b7ed2989b1ec06386a5aa16c0189
Author: David Jacot <[email protected]>
AuthorDate: Tue Dec 2 11:01:11 2025 +0100
MINOR: Update `ShareCoordinator` interface to received
`MetadataImage/Delta` directly (#21029)
This patch is a continuation of the work started in
https://github.com/apache/kafka/pull/21008. It updates the
`ShareCoordinator` interface to follow the same pattern.
Reviewers: Lianet Magrans <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../server/metadata/BrokerMetadataPublisher.scala | 3 +-
.../kafka/coordinator/share/ShareCoordinator.java | 16 ++++-----
.../coordinator/share/ShareCoordinatorService.java | 23 +++++++-----
.../share/ShareCoordinatorServiceTest.java | 41 +++++++++++-----------
4 files changed, 42 insertions(+), 41 deletions(-)
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 3f6abc5c71f..c233d5f45dc 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -26,7 +26,6 @@ import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.internals.Topic
-import
org.apache.kafka.coordinator.common.runtime.{KRaftCoordinatorMetadataDelta,
KRaftCoordinatorMetadataImage}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
@@ -242,7 +241,7 @@ class BrokerMetadataPublisher(
try {
// Propagate the new image to the share coordinator.
- shareCoordinator.onNewMetadataImage(new
KRaftCoordinatorMetadataImage(newImage), newImage.features(), new
KRaftCoordinatorMetadataDelta(delta))
+ shareCoordinator.onMetadataUpdate(delta, newImage)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error
updating share " +
s"coordinator with local changes in $deltaName", t)
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
index a5c29ff6807..ee69827d45e 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java
@@ -30,9 +30,8 @@ import
org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.BufferSupplier;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
-import org.apache.kafka.image.FeaturesImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.share.SharePartitionKey;
import java.util.OptionalInt;
@@ -136,12 +135,11 @@ public interface ShareCoordinator {
/**
* A new metadata image is available.
*
- * @param newImage The new metadata image.
- * @param newFeaturesImage The features image.
- * @param delta The metadata delta.
+ * @param delta The metadata delta.
+ * @param newImage The new metadata image.
*/
- void onNewMetadataImage(
- CoordinatorMetadataImage newImage,
- FeaturesImage newFeaturesImage, CoordinatorMetadataDelta delta
+ void onMetadataUpdate(
+ MetadataDelta delta,
+ MetadataImage newImage
);
}
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index 4f00e1cb842..19d198b8de4 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -45,16 +45,17 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorEventProcessor;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetrics;
import
org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilderSupplier;
+import
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataDelta;
+import
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor;
import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
-import org.apache.kafka.image.FeaturesImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.share.SharePartitionKey;
@@ -69,6 +70,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Set;
@@ -82,7 +84,7 @@ import java.util.function.Supplier;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper.handleOperationException;
-@SuppressWarnings("ClassDataAbstractionCoupling")
+@SuppressWarnings({"ClassDataAbstractionCoupling", "ClassFanOutComplexity"})
public class ShareCoordinatorService implements ShareCoordinator {
private final ShareCoordinatorConfig config;
private final Logger log;
@@ -1098,10 +1100,13 @@ public class ShareCoordinatorService implements
ShareCoordinator {
}
@Override
- public void onNewMetadataImage(CoordinatorMetadataImage newImage,
FeaturesImage newFeaturesImage, CoordinatorMetadataDelta delta) {
+ public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) {
throwIfNotActive();
- this.runtime.onMetadataUpdate(delta, newImage);
- boolean enabled = isShareGroupsEnabled(newFeaturesImage);
+ this.runtime.onMetadataUpdate(
+ new KRaftCoordinatorMetadataDelta(Objects.requireNonNull(delta,
"delta must be provided")),
+ new KRaftCoordinatorMetadataImage(Objects.requireNonNull(newImage,
"newImage must be provided"))
+ );
+ boolean enabled = isShareGroupsEnabled(newImage);
// enabled shouldRunJob result (XOR)
// 0 0 no op on flag, do not call jobs
// 0 1 disable flag, do not call jobs
=> action
@@ -1129,9 +1134,9 @@ public class ShareCoordinatorService implements
ShareCoordinator {
}
}
- private boolean isShareGroupsEnabled(FeaturesImage image) {
+ private boolean isShareGroupsEnabled(MetadataImage image) {
return shareGroupConfigEnabledSupplier.get() ||
ShareVersion.fromFeatureLevel(
- image.finalizedVersions().getOrDefault(ShareVersion.FEATURE_NAME,
(short) 0)
+
image.features().finalizedVersions().getOrDefault(ShareVersion.FEATURE_NAME,
(short) 0)
).supportsShareGroups();
}
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 2c3ef63af6b..ea9b8f164f3 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -41,13 +41,12 @@ import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
-import org.apache.kafka.image.FeaturesImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.util.FutureUtils;
@@ -1491,7 +1490,7 @@ class ShareCoordinatorServiceTest {
));
service.startup(() -> 1);
- service.onNewMetadataImage(mock(CoordinatorMetadataImage.class),
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1586,7 +1585,7 @@ class ShareCoordinatorServiceTest {
));
service.startup(() -> 2);
- service.onNewMetadataImage(mock(CoordinatorMetadataImage.class),
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1648,7 +1647,7 @@ class ShareCoordinatorServiceTest {
));
service.startup(() -> 1);
- service.onNewMetadataImage(mock(CoordinatorMetadataImage.class),
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1701,7 +1700,7 @@ class ShareCoordinatorServiceTest {
));
service.startup(() -> 1);
- service.onNewMetadataImage(mock(CoordinatorMetadataImage.class),
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1752,7 +1751,7 @@ class ShareCoordinatorServiceTest {
));
service.startup(() -> 1);
- service.onNewMetadataImage(mock(CoordinatorMetadataImage.class),
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
verify(runtime, times(0))
.scheduleWriteOperation(
@@ -1816,7 +1815,7 @@ class ShareCoordinatorServiceTest {
));
service.startup(() -> 1);
- service.onNewMetadataImage(mock(CoordinatorMetadataImage.class),
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1892,7 +1891,7 @@ class ShareCoordinatorServiceTest {
));
service.startup(() -> 1);
- service.onNewMetadataImage(mock(CoordinatorMetadataImage.class),
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1952,7 +1951,7 @@ class ShareCoordinatorServiceTest {
)).thenReturn(List.of(CompletableFuture.completedFuture(null)));
service.startup(() -> 1);
- service.onNewMetadataImage(mock(CoordinatorMetadataImage.class),
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
verify(runtime, times(0))
.scheduleWriteOperation(
eq("snapshot-cold-partitions"),
@@ -2010,7 +2009,7 @@ class ShareCoordinatorServiceTest {
));
service.startup(() -> 2);
- service.onNewMetadataImage(mock(CoordinatorMetadataImage.class),
mock(FeaturesImage.class), mock(CoordinatorMetadataDelta.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
verify(runtime, times(0))
.scheduleWriteAllOperation(
eq("snapshot-cold-partitions"),
@@ -2075,11 +2074,11 @@ class ShareCoordinatorServiceTest {
service.startup(() -> 1);
- FeaturesImage mockedFeaturesImage = mock(FeaturesImage.class,
RETURNS_DEEP_STUBS);
+ MetadataImage mockedImage = mock(MetadataImage.class,
RETURNS_DEEP_STUBS);
// Feature disabled on start.
-
when(mockedFeaturesImage.finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
anyShort())).thenReturn((short) 0);
- service.onNewMetadataImage(mock(CoordinatorMetadataImage.class),
mockedFeaturesImage, mock(CoordinatorMetadataDelta.class)); // Jobs will not
execute as feature is OFF in image.
+
when(mockedImage.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
anyShort())).thenReturn((short) 0);
+ service.onMetadataUpdate(mock(MetadataDelta.class), mockedImage); //
Jobs will not execute as feature is OFF in image.
verify(timer, times(0)).add(any()); // Timer task not added.
verify(runtime, times(0)).scheduleWriteOperation(
@@ -2096,9 +2095,9 @@ class ShareCoordinatorServiceTest {
assertFalse(service.shouldRunPeriodicJob());
// Enable feature.
- Mockito.reset(mockedFeaturesImage);
-
when(mockedFeaturesImage.finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
anyShort())).thenReturn((short) 1);
- service.onNewMetadataImage(mock(CoordinatorMetadataImage.class),
mockedFeaturesImage, mock(CoordinatorMetadataDelta.class)); // Jobs will
execute as feature is ON in image.
+ Mockito.reset(mockedImage);
+
when(mockedImage.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
anyShort())).thenReturn((short) 1);
+ service.onMetadataUpdate(mock(MetadataDelta.class), mockedImage); //
Jobs will execute as feature is ON in image.
verify(timer, times(2)).add(any()); // Timer task added twice (prune,
snapshot).
timer.advanceClock(30001L);
@@ -2116,9 +2115,9 @@ class ShareCoordinatorServiceTest {
assertTrue(service.shouldRunPeriodicJob());
// Disable feature
- Mockito.reset(mockedFeaturesImage);
-
when(mockedFeaturesImage.finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
anyShort())).thenReturn((short) 0);
- service.onNewMetadataImage(mock(CoordinatorMetadataImage.class),
mockedFeaturesImage, mock(CoordinatorMetadataDelta.class)); // Jobs will not
execute as feature is on in image.
+ Mockito.reset(mockedImage);
+
when(mockedImage.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
anyShort())).thenReturn((short) 0);
+ service.onMetadataUpdate(mock(MetadataDelta.class), mockedImage); //
Jobs will not execute as feature is on in image.
timer.advanceClock(30001L);
verify(timer, times(4)).add(any()); // Tasks added but will return
immediately.