This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cfd05030061 MINOR: fix some flaky KRaft-related tests (#13543) (#13543)
cfd05030061 is described below

commit cfd0503006127b58bb181076bffaccb947fdd2bd
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Fri Apr 14 10:39:08 2023 -0700

    MINOR: fix some flaky KRaft-related tests (#13543) (#13543)
    
    In SharedServer, fix some cases where a volatile variable could change to 
null while we were using
    it, during shutdown. This is mainly a junit test issue, although it could 
also cause ugly error
    messages during shutdown when running the server in a production context.
    
    Fix a race in KafkaEventQueueTest.testSize.
    
    Reviewers: David Arthur <[email protected]>
---
 .../src/main/scala/kafka/server/SharedServer.scala | 33 +++++++++++-----------
 .../apache/kafka/queue/KafkaEventQueueTest.java    |  4 +--
 2 files changed, 19 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/kafka/server/SharedServer.scala 
b/core/src/main/scala/kafka/server/SharedServer.scala
index ef5f053a8e8..3ba06f36008 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -164,8 +164,8 @@ class SharedServer(
     name = "metadata loading",
     fatal = sharedServerConfig.processRoles.contains(ControllerRole),
     action = () => SharedServer.this.synchronized {
-      if (brokerMetrics != null) 
brokerMetrics.metadataLoadErrorCount.getAndIncrement()
-      if (controllerServerMetrics != null) 
controllerServerMetrics.incrementMetadataErrorCount()
+      Option(brokerMetrics).foreach(_.metadataLoadErrorCount.getAndIncrement())
+      Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
       snapshotsDiabledReason.compareAndSet(null, "metadata loading fault")
     })
 
@@ -176,7 +176,7 @@ class SharedServer(
     name = "controller startup",
     fatal = true,
     action = () => SharedServer.this.synchronized {
-      if (controllerServerMetrics != null) 
controllerServerMetrics.incrementMetadataErrorCount()
+      Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
       snapshotsDiabledReason.compareAndSet(null, "controller startup fault")
     })
 
@@ -187,8 +187,8 @@ class SharedServer(
     name = "initial broker metadata loading",
     fatal = true,
     action = () => SharedServer.this.synchronized {
-      if (brokerMetrics != null) 
brokerMetrics.metadataApplyErrorCount.getAndIncrement()
-      if (controllerServerMetrics != null) 
controllerServerMetrics.incrementMetadataErrorCount()
+      
Option(brokerMetrics).foreach(_.metadataApplyErrorCount.getAndIncrement())
+      Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
       snapshotsDiabledReason.compareAndSet(null, "initial broker metadata 
loading fault")
     })
 
@@ -199,7 +199,7 @@ class SharedServer(
     name = "quorum controller",
     fatal = true,
     action = () => SharedServer.this.synchronized {
-      if (controllerServerMetrics != null) 
controllerServerMetrics.incrementMetadataErrorCount()
+      Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
       snapshotsDiabledReason.compareAndSet(null, "quorum controller fault")
     })
 
@@ -210,8 +210,8 @@ class SharedServer(
     name = "metadata publishing",
     fatal = false,
     action = () => SharedServer.this.synchronized {
-      if (brokerMetrics != null) 
brokerMetrics.metadataApplyErrorCount.getAndIncrement()
-      if (controllerServerMetrics != null) 
controllerServerMetrics.incrementMetadataErrorCount()
+      
Option(brokerMetrics).foreach(_.metadataApplyErrorCount.getAndIncrement())
+      Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
       // Note: snapshot generation does not need to be disabled for a 
publishing fault.
     })
 
@@ -234,7 +234,7 @@ class SharedServer(
         if (sharedServerConfig.processRoles.contains(ControllerRole)) {
           controllerServerMetrics = new 
ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
         }
-        raftManager = new KafkaRaftManager[ApiMessageAndVersion](
+        val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](
           metaProps,
           sharedServerConfig,
           new MetadataRecordSerde,
@@ -246,21 +246,22 @@ class SharedServer(
           controllerQuorumVotersFuture,
           raftManagerFaultHandler
         )
-        raftManager.startup()
+        raftManager = _raftManager
+        _raftManager.startup()
 
         val loaderBuilder = new MetadataLoader.Builder().
           setNodeId(metaProps.nodeId).
           setTime(time).
           setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
           setFaultHandler(metadataLoaderFaultHandler).
-          setHighWaterMarkAccessor(() => raftManager.client.highWatermark())
+          setHighWaterMarkAccessor(() => _raftManager.client.highWatermark())
         if (brokerMetrics != null) {
           loaderBuilder.setMetadataLoaderMetrics(brokerMetrics)
         }
         loader = loaderBuilder.build()
         snapshotEmitter = new SnapshotEmitter.Builder().
           setNodeId(metaProps.nodeId).
-          setRaftClient(raftManager.client).
+          setRaftClient(_raftManager.client).
           build()
         snapshotGenerator = new SnapshotGenerator.Builder(snapshotEmitter).
           setNodeId(metaProps.nodeId).
@@ -271,7 +272,7 @@ class SharedServer(
           setDisabledReason(snapshotsDiabledReason).
           setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
           build()
-        raftManager.register(loader)
+        _raftManager.register(loader)
         try {
           
loader.installPublishers(Collections.singletonList(snapshotGenerator))
         } catch {
@@ -294,10 +295,10 @@ class SharedServer(
   def ensureNotRaftLeader(): Unit = synchronized {
     // Ideally, this would just resign our leadership, if we had it. But we 
don't have an API in
     // RaftManager for that yet, so shut down the RaftManager.
-    if (raftManager != null) {
-      CoreUtils.swallow(raftManager.shutdown(), this)
+    Option(raftManager).foreach(_raftManager => {
+      CoreUtils.swallow(_raftManager.shutdown(), this)
       raftManager = null
-    }
+    })
   }
 
   private def stop(): Unit = synchronized {
diff --git 
a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java 
b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
index 7310cff6375..3c0aa63661e 100644
--- 
a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
@@ -267,7 +267,7 @@ public class KafkaEventQueueTest {
         assertFalse(queue.isEmpty());
         queue.cancelDeferred("later");
         queue.cancelDeferred("soon");
-        assertTrue(queue.isEmpty());
+        TestUtils.waitForCondition(() -> queue.isEmpty(), "Failed to see the 
queue become empty.");
         queue.close();
         assertTrue(queue.isEmpty());
     }
@@ -412,4 +412,4 @@ public class KafkaEventQueueTest {
         assertEquals(InterruptedException.class, 
ieTrapper2.exception.get().getClass());
         queue.close();
     }
-}
\ No newline at end of file
+}

Reply via email to