This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e70da2966a8 MINOR: Remove DefaultExternalKRaftMetrics and use lambda
instead (#21377)
e70da2966a8 is described below
commit e70da2966a8407f8cb1d3441304e2a53d50e1474
Author: Lan Ding <[email protected]>
AuthorDate: Sat Jan 31 11:47:52 2026 +0800
MINOR: Remove DefaultExternalKRaftMetrics and use lambda instead (#21377)
Replace `DefaultExternalKRaftMetrics` with lambda expressions since
`ExternalKRaftMetrics` is a functional interface.
Changes: - Remove `DefaultExternalKRaftMetrics` class and its test
- Use lambda in `SharedServer`, `TestRaftServer`, and `RaftManagerTest`
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../src/main/scala/kafka/server/SharedServer.scala | 9 ++--
.../main/scala/kafka/tools/TestRaftServer.scala | 4 +-
.../scala/unit/kafka/raft/RaftManagerTest.scala | 5 +-
.../metrics/DefaultExternalKRaftMetrics.java | 33 ------------
.../metrics/DefaultExternalKRaftMetricsTest.java | 61 ----------------------
5 files changed, 9 insertions(+), 103 deletions(-)
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala
b/core/src/main/scala/kafka/server/SharedServer.scala
index a20745f35d8..4b55d300a02 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -33,11 +33,11 @@ import org.apache.kafka.image.publisher.{SnapshotEmitter,
SnapshotGenerator}
import org.apache.kafka.metadata.ListenerInfo
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
-import org.apache.kafka.raft.Endpoints
+import org.apache.kafka.raft.{Endpoints, ExternalKRaftMetrics}
import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler,
ProcessTerminatingFaultHandler}
-import org.apache.kafka.server.metrics.{BrokerServerMetrics,
DefaultExternalKRaftMetrics, KafkaYammerMetrics, NodeMetrics}
+import org.apache.kafka.server.metrics.{BrokerServerMetrics,
KafkaYammerMetrics, NodeMetrics}
import java.net.InetSocketAddress
import java.util.Arrays
@@ -282,7 +282,10 @@ class SharedServer(
controllerServerMetrics = new
ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
}
- val externalKRaftMetrics = new
DefaultExternalKRaftMetrics(Optional.ofNullable(brokerMetrics),
Optional.ofNullable(controllerServerMetrics))
+ val externalKRaftMetrics: ExternalKRaftMetrics = ignoredStaticVoters
=> {
+
Option(brokerMetrics).foreach(_.setIgnoredStaticVoters(ignoredStaticVoters))
+
Option(controllerServerMetrics).foreach(_.setIgnoredStaticVoters(ignoredStaticVoters))
+ }
val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](
clusterId,
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 8c3f48ef27b..83eeab0d527 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -42,11 +42,9 @@ import org.apache.kafka.server.SimpleApiVersionManager
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.fault.ProcessTerminatingFaultHandler
-import org.apache.kafka.server.metrics.DefaultExternalKRaftMetrics
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils,
ShutdownableThread}
import org.apache.kafka.snapshot.SnapshotReader
-import java.util.Optional
import scala.jdk.CollectionConverters._
/**
@@ -105,7 +103,7 @@ class TestRaftServer(
topicId,
time,
metrics,
- new DefaultExternalKRaftMetrics(Optional.empty, Optional.empty),
+ _ => {},
Some(threadNamePrefix),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 9f5f9e1cb34..6814bed13de 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -20,7 +20,7 @@ import java.net.InetSocketAddress
import java.nio.channels.FileChannel
import java.nio.channels.OverlappingFileLockException
import java.nio.file.{Files, Path, StandardOpenOption}
-import java.util.{Optional, Properties}
+import java.util.Properties
import java.util.concurrent.CompletableFuture
import kafka.server.KafkaConfig
import kafka.tools.TestRaftServer.ByteArraySerde
@@ -36,7 +36,6 @@ import org.apache.kafka.raft.{Endpoints, KRaftConfigs,
MetadataLogConfig, Quorum
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.fault.FaultHandler
-import org.apache.kafka.server.metrics.DefaultExternalKRaftMetrics
import org.apache.kafka.storage.internals.log.LogManager
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -106,7 +105,7 @@ class RaftManagerTest {
topicId,
Time.SYSTEM,
new Metrics(Time.SYSTEM),
- new DefaultExternalKRaftMetrics(Optional.empty, Optional.empty),
+ _ => {},
Option.empty,
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
diff --git
a/server/src/main/java/org/apache/kafka/server/metrics/DefaultExternalKRaftMetrics.java
b/server/src/main/java/org/apache/kafka/server/metrics/DefaultExternalKRaftMetrics.java
deleted file mode 100644
index 01da081a004..00000000000
---
a/server/src/main/java/org/apache/kafka/server/metrics/DefaultExternalKRaftMetrics.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.server.metrics;
-
-import org.apache.kafka.controller.metrics.ControllerMetadataMetrics;
-import org.apache.kafka.raft.ExternalKRaftMetrics;
-
-import java.util.Optional;
-
-public record DefaultExternalKRaftMetrics(
- Optional<BrokerServerMetrics> brokerServerMetrics,
- Optional<ControllerMetadataMetrics> controllerMetadataMetrics)
implements ExternalKRaftMetrics {
-
- @Override
- public void setIgnoredStaticVoters(boolean ignoredStaticVoters) {
- brokerServerMetrics.ifPresent(metrics ->
metrics.setIgnoredStaticVoters(ignoredStaticVoters));
- controllerMetadataMetrics.ifPresent(metrics ->
metrics.setIgnoredStaticVoters(ignoredStaticVoters));
- }
-}
diff --git
a/server/src/test/java/org/apache/kafka/server/metrics/DefaultExternalKRaftMetricsTest.java
b/server/src/test/java/org/apache/kafka/server/metrics/DefaultExternalKRaftMetricsTest.java
deleted file mode 100644
index 15a934e95a8..00000000000
---
a/server/src/test/java/org/apache/kafka/server/metrics/DefaultExternalKRaftMetricsTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.server.metrics;
-
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.controller.metrics.ControllerMetadataMetrics;
-
-import com.yammer.metrics.core.MetricsRegistry;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Optional;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class DefaultExternalKRaftMetricsTest {
-
- @Test
- public void testDefaultExternalKRaftMetrics() {
- BrokerServerMetrics brokerServerMetrics = new BrokerServerMetrics(new
Metrics());
- ControllerMetadataMetrics controllerMetadataMetrics = new
ControllerMetadataMetrics(Optional.of(new MetricsRegistry()));
- DefaultExternalKRaftMetrics metrics = new DefaultExternalKRaftMetrics(
- Optional.of(brokerServerMetrics),
- Optional.of(controllerMetadataMetrics)
- );
-
- assertFalse(brokerServerMetrics.ignoredStaticVoters());
- assertFalse(controllerMetadataMetrics.ignoredStaticVoters());
-
- metrics.setIgnoredStaticVoters(true);
-
- assertTrue(brokerServerMetrics.ignoredStaticVoters());
- assertTrue(controllerMetadataMetrics.ignoredStaticVoters());
-
- metrics.setIgnoredStaticVoters(false);
-
- assertFalse(brokerServerMetrics.ignoredStaticVoters());
- assertFalse(controllerMetadataMetrics.ignoredStaticVoters());
- }
-
- @Test
- public void testEmptyDefaultExternalKRaftMetrics() {
- DefaultExternalKRaftMetrics metrics = new
DefaultExternalKRaftMetrics(Optional.empty(), Optional.empty());
- metrics.setIgnoredStaticVoters(true);
- }
-}