This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 98ed67e73e6 MINOR: Prevent this leak while DefaultStatePersister 
construction (#22292)
98ed67e73e6 is described below

commit 98ed67e73e6a13ca465f49d1c83144503ba4dd18
Author: Sushant Mahajan <[email protected]>
AuthorDate: Mon May 18 20:03:45 2026 +0530

    MINOR: Prevent this leak while DefaultStatePersister construction (#22292)
    
    * Currently `DefaultStatePersister` instantiates `PersisterStateManager`
    in its constructor. This implies that the `this` creation is not yet
    complete and initializing another component.
    * To remedy this the public API constructor is replaced with static
    factory method.
    * Tests and `BrokerServer` code has been updated to reflect the change.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 core/src/main/scala/kafka/server/BrokerServer.scala  | 18 +++++++-----------
 .../share/persister/DefaultStatePersister.java       | 20 ++++++++++++++++++--
 .../share/persister/DefaultStatePersisterTest.java   |  5 ++---
 3 files changed, 27 insertions(+), 16 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 681848ad0ef..2619a44788f 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -53,7 +53,7 @@ import 
org.apache.kafka.server.log.remote.metadata.storage.BrokerReadyCallback
 import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, 
RemoteLogManagerConfig}
 import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, 
KafkaYammerMetrics}
 import org.apache.kafka.server.network.{EndpointReadyFutures, 
KafkaAuthorizerServerInfo}
-import org.apache.kafka.server.share.persister.{DefaultStatePersister, 
NoOpStatePersister, Persister, PersisterStateManager}
+import org.apache.kafka.server.share.persister.{DefaultStatePersister, 
NoOpStatePersister, Persister}
 import org.apache.kafka.server.share.session.ShareSessionCache
 import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper, 
Timer}
 import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler, 
NetworkPartitionMetadataClient, PartitionMetadataClient}
@@ -724,17 +724,13 @@ class BrokerServer(
   private def createShareStatePersister(): Persister = {
     if (config.shareGroupConfig.shareGroupPersisterClassName.nonEmpty) {
       val klass = 
Utils.loadClass(config.shareGroupConfig.shareGroupPersisterClassName, 
classOf[Object]).asInstanceOf[Class[Persister]]
-
       if (klass.getName.equals(classOf[DefaultStatePersister].getName)) {
-        klass.getConstructor(classOf[PersisterStateManager])
-          .newInstance(
-            new PersisterStateManager(
-              NetworkUtils.buildNetworkClient("Persister", config, metrics, 
Time.SYSTEM, new LogContext(s"[Persister broker=${config.brokerId}]")),
-              new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key 
=> shareCoordinator.partitionFor(key), config.interBrokerListenerName, 
groupConfigManager),
-              Time.SYSTEM,
-              shareGroupTimer
-            )
-          )
+        DefaultStatePersister.instance(
+          NetworkUtils.buildNetworkClient("Persister", config, metrics, 
Time.SYSTEM, new LogContext(s"[Persister broker=${config.brokerId}]")),
+          new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key => 
shareCoordinator.partitionFor(key), config.interBrokerListenerName, 
groupConfigManager),
+          Time.SYSTEM,
+          shareGroupTimer
+        )
       } else if (klass.getName.equals(classOf[NoOpStatePersister].getName)) {
         info("Using no-op persister")
         new NoOpStatePersister()
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
index 7ac49e6e413..7522b0e0f0f 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.server.share.persister;
 
+import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
@@ -24,6 +25,8 @@ import 
org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
 import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
 import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.util.timer.Timer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,11 +45,24 @@ import java.util.concurrent.CompletableFuture;
  */
 public class DefaultStatePersister implements Persister {
     private final PersisterStateManager stateManager;
-
     private static final Logger log = 
LoggerFactory.getLogger(DefaultStatePersister.class);
 
-    public DefaultStatePersister(PersisterStateManager stateManager) {
+    public static DefaultStatePersister instance(KafkaClient client, 
ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+        DefaultStatePersister instance = new DefaultStatePersister(client, 
cacheHelper, time, timer);
+        instance.start();
+        return instance;
+    }
+
+    // Visibility for tests
+    DefaultStatePersister(PersisterStateManager stateManager) {
         this.stateManager = stateManager;
+    }
+
+    private DefaultStatePersister(KafkaClient client, 
ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+        this.stateManager = new PersisterStateManager(client, cacheHelper, 
time, timer);
+    }
+
+    private void start() {
         this.stateManager.start();
     }
 
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
index 51a617c43db..5a70979aa0d 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
@@ -106,8 +106,7 @@ class DefaultStatePersisterTest {
         }
 
         public DefaultStatePersister build() {
-            PersisterStateManager persisterStateManager = new 
PersisterStateManager(client, cacheHelper, time, timer);
-            return new DefaultStatePersister(persisterStateManager);
+            return DefaultStatePersister.instance(client, cacheHelper, time, 
timer);
         }
     }
 
@@ -200,7 +199,7 @@ class DefaultStatePersisterTest {
                 .setGroupId(groupId)
                 .setTopicsData(List.of(new TopicData<>(topicId,
                     List.of(PartitionFactory.newPartitionStateBatchData(
-                        incorrectPartition, 1, 0, 0,  0, 
null))))).build()).build());
+                        incorrectPartition, 1, 0, 0, 0, 
null))))).build()).build());
         assertTrue(result.isDone());
         assertTrue(result.isCompletedExceptionally());
         assertFutureThrows(IllegalArgumentException.class, result);

Reply via email to