This is an automated email from the ASF dual-hosted git repository.

clolov pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 5f3b175b64d097fc2377881d92839c1142deaa70
Author: TengYao Chi <[email protected]>
AuthorDate: Sat Jan 25 01:23:48 2025 +0800

    KAFKA-18630: Clean ReplicaManagerBuilder (#18687)
    
    Reviewers: Christo Lolov <[email protected]>
---
 .../server/builders/ReplicaManagerBuilder.java     | 73 ++++------------------
 1 file changed, 13 insertions(+), 60 deletions(-)

diff --git 
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index b580485139b..a431dba15c0 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -18,34 +18,24 @@
 package kafka.server.builders;
 
 import kafka.log.LogManager;
-import kafka.log.remote.RemoteLogManager;
-import kafka.server.AddPartitionsToTxnManager;
 import kafka.server.AlterPartitionManager;
-import kafka.server.DelayedDeleteRecords;
-import kafka.server.DelayedFetch;
-import kafka.server.DelayedProduce;
-import kafka.server.DelayedRemoteFetch;
-import kafka.server.DelayedRemoteListOffsets;
 import kafka.server.KafkaConfig;
 import kafka.server.MetadataCache;
 import kafka.server.QuotaFactory.QuotaManagers;
 import kafka.server.ReplicaManager;
-import kafka.server.share.DelayedShareFetch;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.DelayedActionQueue;
 import org.apache.kafka.server.common.DirectoryEventHandler;
-import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
 import org.apache.kafka.server.util.Scheduler;
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
 
 import java.util.Collections;
-import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import scala.jdk.javaapi.OptionConverters;
+import scala.Option;
 
 
 
@@ -60,18 +50,6 @@ public class ReplicaManagerBuilder {
     private LogDirFailureChannel logDirFailureChannel = null;
     private AlterPartitionManager alterPartitionManager = null;
     private BrokerTopicStats brokerTopicStats = null;
-    private AtomicBoolean isShuttingDown = new AtomicBoolean(false);
-    private Optional<RemoteLogManager> remoteLogManager = Optional.empty();
-    private Optional<DelayedOperationPurgatory<DelayedProduce>> 
delayedProducePurgatory = Optional.empty();
-    private Optional<DelayedOperationPurgatory<DelayedFetch>> 
delayedFetchPurgatory = Optional.empty();
-    private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>> 
delayedDeleteRecordsPurgatory = Optional.empty();
-    private Optional<DelayedOperationPurgatory<DelayedRemoteFetch>> 
delayedRemoteFetchPurgatory = Optional.empty();
-    private Optional<DelayedOperationPurgatory<DelayedRemoteListOffsets>> 
delayedRemoteListOffsetsPurgatory = Optional.empty();
-    private Optional<DelayedOperationPurgatory<DelayedShareFetch>> 
delayedShareFetchPurgatory = Optional.empty();
-    private Optional<String> threadNamePrefix = Optional.empty();
-    private Long brokerEpoch = -1L;
-    private Optional<AddPartitionsToTxnManager> addPartitionsToTxnManager = 
Optional.empty();
-    private DirectoryEventHandler directoryEventHandler = 
DirectoryEventHandler.NOOP;
 
     public ReplicaManagerBuilder setConfig(KafkaConfig config) {
         this.config = config;
@@ -98,11 +76,6 @@ public class ReplicaManagerBuilder {
         return this;
     }
 
-    public ReplicaManagerBuilder setRemoteLogManager(RemoteLogManager 
remoteLogManager) {
-        this.remoteLogManager = Optional.ofNullable(remoteLogManager);
-        return this;
-    }
-
     public ReplicaManagerBuilder setQuotaManagers(QuotaManagers quotaManagers) 
{
         this.quotaManagers = quotaManagers;
         return this;
@@ -128,26 +101,6 @@ public class ReplicaManagerBuilder {
         return this;
     }
 
-    public ReplicaManagerBuilder 
setDelayedFetchPurgatory(DelayedOperationPurgatory<DelayedFetch> 
delayedFetchPurgatory) {
-        this.delayedFetchPurgatory = Optional.of(delayedFetchPurgatory);
-        return this;
-    }
-
-    public ReplicaManagerBuilder setThreadNamePrefix(String threadNamePrefix) {
-        this.threadNamePrefix = Optional.of(threadNamePrefix);
-        return this;
-    }
-
-    public ReplicaManagerBuilder setBrokerEpoch(long brokerEpoch) {
-        this.brokerEpoch = brokerEpoch;
-        return this;
-    }
-
-    public ReplicaManagerBuilder 
setDirectoryEventHandler(DirectoryEventHandler directoryEventHandler) {
-        this.directoryEventHandler = directoryEventHandler;
-        return this;
-    }
-
     public ReplicaManager build() {
         if (config == null) config = new KafkaConfig(Collections.emptyMap());
         if (logManager == null) throw new RuntimeException("You must set 
logManager");
@@ -164,23 +117,23 @@ public class ReplicaManagerBuilder {
                              time,
                              scheduler,
                              logManager,
-                             OptionConverters.toScala(remoteLogManager),
+                             Option.empty(),
                              quotaManagers,
                              metadataCache,
                              logDirFailureChannel,
                              alterPartitionManager,
                              brokerTopicStats,
-                             isShuttingDown,
-                             OptionConverters.toScala(delayedProducePurgatory),
-                             OptionConverters.toScala(delayedFetchPurgatory),
-                             
OptionConverters.toScala(delayedDeleteRecordsPurgatory),
-                             
OptionConverters.toScala(delayedRemoteFetchPurgatory),
-                             
OptionConverters.toScala(delayedRemoteListOffsetsPurgatory),
-                             
OptionConverters.toScala(delayedShareFetchPurgatory),
-                             OptionConverters.toScala(threadNamePrefix),
-                             () -> brokerEpoch,
-                             
OptionConverters.toScala(addPartitionsToTxnManager),
-                             directoryEventHandler,
+                             new AtomicBoolean(false),
+                             Option.empty(),
+                             Option.empty(),
+                             Option.empty(),
+                             Option.empty(),
+                             Option.empty(),
+                             Option.empty(),
+                             Option.empty(),
+                             () ->  -1L,
+                             Option.empty(),
+                             DirectoryEventHandler.NOOP,
                              new DelayedActionQueue());
     }
 }

Reply via email to