This is an automated email from the ASF dual-hosted git repository. clolov pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 5f3b175b64d097fc2377881d92839c1142deaa70 Author: TengYao Chi <[email protected]> AuthorDate: Sat Jan 25 01:23:48 2025 +0800 KAFKA-18630: Clean ReplicaManagerBuilder (#18687) Reviewers: Christo Lolov <[email protected]> --- .../server/builders/ReplicaManagerBuilder.java | 73 ++++------------------ 1 file changed, 13 insertions(+), 60 deletions(-) diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java index b580485139b..a431dba15c0 100644 --- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java @@ -18,34 +18,24 @@ package kafka.server.builders; import kafka.log.LogManager; -import kafka.log.remote.RemoteLogManager; -import kafka.server.AddPartitionsToTxnManager; import kafka.server.AlterPartitionManager; -import kafka.server.DelayedDeleteRecords; -import kafka.server.DelayedFetch; -import kafka.server.DelayedProduce; -import kafka.server.DelayedRemoteFetch; -import kafka.server.DelayedRemoteListOffsets; import kafka.server.KafkaConfig; import kafka.server.MetadataCache; import kafka.server.QuotaFactory.QuotaManagers; import kafka.server.ReplicaManager; -import kafka.server.share.DelayedShareFetch; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.DelayedActionQueue; import org.apache.kafka.server.common.DirectoryEventHandler; -import org.apache.kafka.server.purgatory.DelayedOperationPurgatory; import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.storage.internals.log.LogDirFailureChannel; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import java.util.Collections; -import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; -import scala.jdk.javaapi.OptionConverters; +import scala.Option; @@ -60,18 +50,6 @@ public class ReplicaManagerBuilder { private LogDirFailureChannel logDirFailureChannel = null; private AlterPartitionManager alterPartitionManager = null; private BrokerTopicStats brokerTopicStats = null; - private AtomicBoolean isShuttingDown = new AtomicBoolean(false); - private Optional<RemoteLogManager> remoteLogManager = Optional.empty(); - private Optional<DelayedOperationPurgatory<DelayedProduce>> delayedProducePurgatory = Optional.empty(); - private Optional<DelayedOperationPurgatory<DelayedFetch>> delayedFetchPurgatory = Optional.empty(); - private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>> delayedDeleteRecordsPurgatory = Optional.empty(); - private Optional<DelayedOperationPurgatory<DelayedRemoteFetch>> delayedRemoteFetchPurgatory = Optional.empty(); - private Optional<DelayedOperationPurgatory<DelayedRemoteListOffsets>> delayedRemoteListOffsetsPurgatory = Optional.empty(); - private Optional<DelayedOperationPurgatory<DelayedShareFetch>> delayedShareFetchPurgatory = Optional.empty(); - private Optional<String> threadNamePrefix = Optional.empty(); - private Long brokerEpoch = -1L; - private Optional<AddPartitionsToTxnManager> addPartitionsToTxnManager = Optional.empty(); - private DirectoryEventHandler directoryEventHandler = DirectoryEventHandler.NOOP; public ReplicaManagerBuilder setConfig(KafkaConfig config) { this.config = config; @@ -98,11 +76,6 @@ public class ReplicaManagerBuilder { return this; } - public ReplicaManagerBuilder setRemoteLogManager(RemoteLogManager remoteLogManager) { - this.remoteLogManager = Optional.ofNullable(remoteLogManager); - return this; - } - public ReplicaManagerBuilder setQuotaManagers(QuotaManagers quotaManagers) { this.quotaManagers = quotaManagers; return this; @@ -128,26 +101,6 @@ public class ReplicaManagerBuilder { return this; } - public ReplicaManagerBuilder setDelayedFetchPurgatory(DelayedOperationPurgatory<DelayedFetch> delayedFetchPurgatory) { - this.delayedFetchPurgatory = Optional.of(delayedFetchPurgatory); - return this; - } - - public ReplicaManagerBuilder setThreadNamePrefix(String threadNamePrefix) { - this.threadNamePrefix = Optional.of(threadNamePrefix); - return this; - } - - public ReplicaManagerBuilder setBrokerEpoch(long brokerEpoch) { - this.brokerEpoch = brokerEpoch; - return this; - } - - public ReplicaManagerBuilder setDirectoryEventHandler(DirectoryEventHandler directoryEventHandler) { - this.directoryEventHandler = directoryEventHandler; - return this; - } - public ReplicaManager build() { if (config == null) config = new KafkaConfig(Collections.emptyMap()); if (logManager == null) throw new RuntimeException("You must set logManager"); @@ -164,23 +117,23 @@ public class ReplicaManagerBuilder { time, scheduler, logManager, - OptionConverters.toScala(remoteLogManager), + Option.empty(), quotaManagers, metadataCache, logDirFailureChannel, alterPartitionManager, brokerTopicStats, - isShuttingDown, - OptionConverters.toScala(delayedProducePurgatory), - OptionConverters.toScala(delayedFetchPurgatory), - OptionConverters.toScala(delayedDeleteRecordsPurgatory), - OptionConverters.toScala(delayedRemoteFetchPurgatory), - OptionConverters.toScala(delayedRemoteListOffsetsPurgatory), - OptionConverters.toScala(delayedShareFetchPurgatory), - OptionConverters.toScala(threadNamePrefix), - () -> brokerEpoch, - OptionConverters.toScala(addPartitionsToTxnManager), - directoryEventHandler, + new AtomicBoolean(false), + Option.empty(), + Option.empty(), + Option.empty(), + Option.empty(), + Option.empty(), + Option.empty(), + Option.empty(), + () -> -1L, + Option.empty(), + DirectoryEventHandler.NOOP, new DelayedActionQueue()); } }
