This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 98ed67e73e6 MINOR: Prevent this leak while DefaultStatePersister
construction (#22292)
98ed67e73e6 is described below
commit 98ed67e73e6a13ca465f49d1c83144503ba4dd18
Author: Sushant Mahajan <[email protected]>
AuthorDate: Mon May 18 20:03:45 2026 +0530
MINOR: Prevent this leak while DefaultStatePersister construction (#22292)
* Currently `DefaultStatePersister` instantiates `PersisterStateManager`
in its constructor. This implies that the `this` creation is not yet
complete and initializing another component.
* To remedy this the public API constructor is replaced with static
factory method.
* Tests and `BrokerServer` code has been updated to reflect the change.
Reviewers: Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/server/BrokerServer.scala | 18 +++++++-----------
.../share/persister/DefaultStatePersister.java | 20 ++++++++++++++++++--
.../share/persister/DefaultStatePersisterTest.java | 5 ++---
3 files changed, 27 insertions(+), 16 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 681848ad0ef..2619a44788f 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -53,7 +53,7 @@ import
org.apache.kafka.server.log.remote.metadata.storage.BrokerReadyCallback
import org.apache.kafka.server.log.remote.storage.{RemoteLogManager,
RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin,
KafkaYammerMetrics}
import org.apache.kafka.server.network.{EndpointReadyFutures,
KafkaAuthorizerServerInfo}
-import org.apache.kafka.server.share.persister.{DefaultStatePersister,
NoOpStatePersister, Persister, PersisterStateManager}
+import org.apache.kafka.server.share.persister.{DefaultStatePersister,
NoOpStatePersister, Persister}
import org.apache.kafka.server.share.session.ShareSessionCache
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper,
Timer}
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler,
NetworkPartitionMetadataClient, PartitionMetadataClient}
@@ -724,17 +724,13 @@ class BrokerServer(
private def createShareStatePersister(): Persister = {
if (config.shareGroupConfig.shareGroupPersisterClassName.nonEmpty) {
val klass =
Utils.loadClass(config.shareGroupConfig.shareGroupPersisterClassName,
classOf[Object]).asInstanceOf[Class[Persister]]
-
if (klass.getName.equals(classOf[DefaultStatePersister].getName)) {
- klass.getConstructor(classOf[PersisterStateManager])
- .newInstance(
- new PersisterStateManager(
- NetworkUtils.buildNetworkClient("Persister", config, metrics,
Time.SYSTEM, new LogContext(s"[Persister broker=${config.brokerId}]")),
- new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key
=> shareCoordinator.partitionFor(key), config.interBrokerListenerName,
groupConfigManager),
- Time.SYSTEM,
- shareGroupTimer
- )
- )
+ DefaultStatePersister.instance(
+ NetworkUtils.buildNetworkClient("Persister", config, metrics,
Time.SYSTEM, new LogContext(s"[Persister broker=${config.brokerId}]")),
+ new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key =>
shareCoordinator.partitionFor(key), config.interBrokerListenerName,
groupConfigManager),
+ Time.SYSTEM,
+ shareGroupTimer
+ )
} else if (klass.getName.equals(classOf[NoOpStatePersister].getName)) {
info("Using no-op persister")
new NoOpStatePersister()
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
index 7ac49e6e413..7522b0e0f0f 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
@@ -17,6 +17,7 @@
package org.apache.kafka.server.share.persister;
+import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
@@ -24,6 +25,8 @@ import
org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.util.timer.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,11 +45,24 @@ import java.util.concurrent.CompletableFuture;
*/
public class DefaultStatePersister implements Persister {
private final PersisterStateManager stateManager;
-
private static final Logger log =
LoggerFactory.getLogger(DefaultStatePersister.class);
- public DefaultStatePersister(PersisterStateManager stateManager) {
+ public static DefaultStatePersister instance(KafkaClient client,
ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+ DefaultStatePersister instance = new DefaultStatePersister(client,
cacheHelper, time, timer);
+ instance.start();
+ return instance;
+ }
+
+ // Visibility for tests
+ DefaultStatePersister(PersisterStateManager stateManager) {
this.stateManager = stateManager;
+ }
+
+ private DefaultStatePersister(KafkaClient client,
ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+ this.stateManager = new PersisterStateManager(client, cacheHelper,
time, timer);
+ }
+
+ private void start() {
this.stateManager.start();
}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
index 51a617c43db..5a70979aa0d 100644
---
a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
@@ -106,8 +106,7 @@ class DefaultStatePersisterTest {
}
public DefaultStatePersister build() {
- PersisterStateManager persisterStateManager = new
PersisterStateManager(client, cacheHelper, time, timer);
- return new DefaultStatePersister(persisterStateManager);
+ return DefaultStatePersister.instance(client, cacheHelper, time,
timer);
}
}
@@ -200,7 +199,7 @@ class DefaultStatePersisterTest {
.setGroupId(groupId)
.setTopicsData(List.of(new TopicData<>(topicId,
List.of(PartitionFactory.newPartitionStateBatchData(
- incorrectPartition, 1, 0, 0, 0,
null))))).build()).build());
+ incorrectPartition, 1, 0, 0, 0,
null))))).build()).build());
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(IllegalArgumentException.class, result);