This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 09e59bc7761 KAFKA-14857: Fix some MetadataLoader bugs (#13462)
09e59bc7761 is described below

commit 09e59bc7761a6b9ec1437b3decdfcd7b5fff868e
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Wed Mar 29 12:30:12 2023 -0700

    KAFKA-14857: Fix some MetadataLoader bugs (#13462)
    
    The MetadataLoader is not supposed to publish metadata updates until we 
have loaded up to the high
    water mark. Previously, this logic was broken, and we published updates 
immediately. This PR fixes
    that and adds a junit test.
    
    Another issue is that the MetadataLoader previously assumed that we would 
periodically get
    callbacks from the Raft layer even if nothing had happened. We relied on 
this to install new
    publishers in a timely fashion, for example. However, in older 
MetadataVersions that don't include
    NoOpRecord, this is not a safe assumption.
    
    Aside from the above changes, also fix a deadlock in SnapshotGeneratorTest, 
fix the log prefix for
    BrokerLifecycleManager, and remove metadata publishers on brokerserver 
shutdown (like we do for
    controllers).
    
    Reviewers: David Arthur <[email protected]>, dengziming 
<[email protected]>
---
 .../kafka/server/BrokerLifecycleManager.scala      |   2 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |  11 +-
 .../kafka/server/KRaftClusterTest.scala            |  20 +++
 .../apache/kafka/image/loader/MetadataLoader.java  | 153 +++++++++++++--------
 .../kafka/image/loader/MetadataLoaderTest.java     |  43 +++---
 .../image/publisher/SnapshotGeneratorTest.java     |   7 +-
 .../org/apache/kafka/queue/KafkaEventQueue.java    |   4 +
 7 files changed, 157 insertions(+), 83 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala 
b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index 346044c39e6..9d4682182a7 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -64,7 +64,7 @@ class BrokerLifecycleManager(
     if (isZkBroker) {
       builder.append(" isZkBroker=true")
     }
-    builder.append("]")
+    builder.append("] ")
     builder.toString()
   }
 
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 191e3c45f02..e3a657cd982 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -138,6 +138,8 @@ class BrokerServer(
 
   def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
 
+  val metadataPublishers: util.List[MetadataPublisher] = new 
util.ArrayList[MetadataPublisher]()
+
   private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): 
Boolean = {
     lock.lock()
     try {
@@ -406,7 +408,6 @@ class BrokerServer(
         config.numIoThreads, 
s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
         DataPlaneAcceptor.ThreadPrefix)
 
-      val publishers = new util.ArrayList[MetadataPublisher]()
       brokerMetadataPublisher = new BrokerMetadataPublisher(config,
         metadataCache,
         logManager,
@@ -431,7 +432,7 @@ class BrokerServer(
         authorizer,
         sharedServer.initialBrokerMetadataLoadFaultHandler,
         sharedServer.metadataPublishingFaultHandler)
-      publishers.add(brokerMetadataPublisher)
+      metadataPublishers.add(brokerMetadataPublisher)
 
       // Register parts of the broker that can be reconfigured via dynamic 
configs.  This needs to
       // be done before we publish the dynamic configs, so that we don't miss 
anything.
@@ -440,7 +441,7 @@ class BrokerServer(
       // Install all the metadata publishers.
       FutureUtils.waitWithLogging(logger.underlying, logIdent,
         "the broker metadata publishers to be installed",
-        sharedServer.loader.installPublishers(publishers), startupDeadline, 
time)
+        sharedServer.loader.installPublishers(metadataPublishers), 
startupDeadline, time)
 
       // Wait for this broker to contact the quorum, and for the active 
controller to acknowledge
       // us as caught up. It will do this by returning a heartbeat response 
with isCaughtUp set to
@@ -537,14 +538,14 @@ class BrokerServer(
             error("Got unexpected exception waiting for controlled shutdown 
future", e)
         }
       }
-
       lifecycleManager.beginShutdown()
-
       // Stop socket server to stop accepting any more connections and 
requests.
       // Socket server will be shutdown towards the end of the sequence.
       if (socketServer != null) {
         CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
       }
+      metadataPublishers.forEach(p => 
sharedServer.loader.removeAndClosePublisher(p).get())
+      metadataPublishers.clear()
       if (dataPlaneRequestHandlerPool != null)
         CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
       if (dataPlaneRequestProcessor != null)
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 34afeb7fc49..084ff43fcf1 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -974,6 +974,26 @@ class KRaftClusterTest {
     }
   }
 
+  /**
+   * Test a single broker, single controller cluster at the minimum bootstrap 
level. This tests
+   * that we can function without having periodic NoOpRecords written.
+   */
+  @Test
+  def testSingleControllerSingleBrokerCluster(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setBootstrapMetadataVersion(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION).
+        setNumBrokerNodes(1).
+        setNumControllerNodes(1).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+    } finally {
+      cluster.close()
+    }
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = Array(false, true))
   def testReconfigureControllerClientQuotas(combinedController: Boolean): Unit 
= {
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index ac34d1fa4a5..839ea9f3e93 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -42,6 +42,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -140,6 +141,8 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
         }
     }
 
+    private static final String INITIALIZE_NEW_PUBLISHERS = 
"InitializeNewPublishers";
+
     /**
      * The log4j logger for this loader.
      */
@@ -166,7 +169,7 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
     private final Supplier<OptionalLong> highWaterMarkAccessor;
 
     /**
-     * Publishers which haven't been initialized yet.
+     * Publishers which haven't received any metadata yet.
      */
     private final LinkedHashMap<String, MetadataPublisher> 
uninitializedPublishers;
 
@@ -176,9 +179,10 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
     private final LinkedHashMap<String, MetadataPublisher> publishers;
 
     /**
-     * True if we have caught up with the initial high water mark.
+     * True if we have not caught up with the initial high water mark.
+     * We do not send out any metadata updates until this is true.
      */
-    private boolean catchingUp = false;
+    private boolean catchingUp = true;
 
     /**
      * The current leader and epoch.
@@ -212,38 +216,67 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
         this.uninitializedPublishers = new LinkedHashMap<>();
         this.publishers = new LinkedHashMap<>();
         this.image = MetadataImage.EMPTY;
-        this.eventQueue = new KafkaEventQueue(time, logContext,
+        this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext,
                 threadNamePrefix + "metadata-loader-",
                 new ShutdownEvent());
     }
 
-    private boolean stillNeedToCatchUp(long offset) {
+    private boolean stillNeedToCatchUp(String where, long offset) {
         if (!catchingUp) {
-            log.trace("We are not in the initial catching up state.");
+            log.trace("{}: we are not in the initial catching up state.", 
where);
             return false;
         }
         OptionalLong highWaterMark = highWaterMarkAccessor.get();
         if (!highWaterMark.isPresent()) {
-            log.info("The loader is still catching up because we still don't 
know the high " +
-                    "water mark yet.");
+            log.info("{}: the loader is still catching up because we still 
don't know the high " +
+                    "water mark yet.", where);
             return true;
         }
-        if (highWaterMark.getAsLong() > offset) {
-            log.info("The loader is still catching up because we have loaded 
up to offset " +
-                    offset + ", but the high water mark is " + 
highWaterMark.getAsLong());
+        if (highWaterMark.getAsLong() - 1 > offset) {
+            log.info("{}: The loader is still catching up because we have 
loaded up to offset " +
+                    offset + ", but the high water mark is {}", where, 
highWaterMark.getAsLong());
             return true;
         }
-        log.info("The loader finished catch up to the current high water mark 
of " +
-                highWaterMark.getAsLong());
-        catchingUp = true;
+        log.info("{}: The loader finished catching up to the current high 
water mark of {}",
+                where, highWaterMark.getAsLong());
+        catchingUp = false;
         return false;
     }
 
-    private void maybeInitializeNewPublishers() {
+    /**
+     * Schedule an event to initialize the new publishers that are present in 
the system.
+     *
+     * @param delayNs   The minimum time in nanoseconds we should wait. If 
there is already an
+     *                  initialization event scheduled, we will either move 
its deadline forward
+     *                  in time or leave it unchanged.
+     */
+    void scheduleInitializeNewPublishers(long delayNs) {
+        eventQueue.scheduleDeferred(INITIALIZE_NEW_PUBLISHERS,
+            new 
EventQueue.EarliestDeadlineFunction(eventQueue.time().nanoseconds() + delayNs),
+            () -> {
+                try {
+                    initializeNewPublishers();
+                } catch (Throwable e) {
+                    faultHandler.handleFault("Unhandled error initializing new 
publishers", e);
+                }
+            });
+    }
+
+    void initializeNewPublishers() {
         if (uninitializedPublishers.isEmpty()) {
-            log.trace("There are no uninitialized publishers to initialize.");
+            log.debug("InitializeNewPublishers: nothing to do.");
+            return;
+        }
+        if (stillNeedToCatchUp("initializeNewPublishers", 
image.highestOffsetAndEpoch().offset())) {
+            // Reschedule the initialization for later.
+            log.debug("InitializeNewPublishers: unable to initialize new 
publisher(s) {} " +
+                            "because we are still catching up with quorum 
metadata. Rescheduling.",
+                    uninitializedPublisherNames());
+            
scheduleInitializeNewPublishers(TimeUnit.MILLISECONDS.toNanos(100));
             return;
         }
+        log.debug("InitializeNewPublishers: setting up snapshot image for new 
publisher(s): {}",
+                uninitializedPublisherNames());
         long startNs = time.nanoseconds();
         MetadataDelta delta = new MetadataDelta.Builder().
                 setImage(image).
@@ -260,19 +293,22 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
             MetadataPublisher publisher = iter.next();
             iter.remove();
             try {
-                log.info("Publishing initial snapshot at offset {} to {}",
-                        image.highestOffsetAndEpoch().offset(), 
publisher.name());
+                log.info("InitializeNewPublishers: initializing {} with a 
snapshot at offset {}",
+                        publisher.name(), 
image.highestOffsetAndEpoch().offset());
                 publisher.onMetadataUpdate(delta, image, manifest);
                 publisher.onControllerChange(currentLeaderAndEpoch);
                 publishers.put(publisher.name(), publisher);
             } catch (Throwable e) {
-                faultHandler.handleFault("Unhandled error publishing the 
initial metadata " +
-                        "image from snapshot at offset " + 
image.highestOffsetAndEpoch().offset() +
-                        " with publisher " + publisher.name(), e);
+                faultHandler.handleFault("Unhandled error initializing " + 
publisher.name() +
+                        " with a snapshot at offset " + 
image.highestOffsetAndEpoch().offset(), e);
             }
         }
     }
 
+    private String uninitializedPublisherNames() {
+        return String.join(", ", uninitializedPublishers.keySet());
+    }
+
     @Override
     public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
         eventQueue.append(() -> {
@@ -282,7 +318,7 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                         build();
                 LogDeltaManifest manifest = loadLogDelta(delta, reader);
                 if (log.isDebugEnabled()) {
-                    log.debug("Generated a metadata delta between {} and {} 
from {} batch(es) " +
+                    log.debug("handleCommit: Generated a metadata delta 
between {} and {} from {} batch(es) " +
                             "in {} us.", image.offset(), 
manifest.provenance().lastContainedOffset(),
                             manifest.numBatches(), 
NANOSECONDS.toMicros(manifest.elapsedNs()));
                 }
@@ -294,10 +330,12 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                             " and " + 
manifest.provenance().lastContainedOffset(), e);
                     return;
                 }
-                if 
(stillNeedToCatchUp(manifest.provenance().lastContainedOffset())) {
+                if (stillNeedToCatchUp("handleCommit", 
manifest.provenance().lastContainedOffset())) {
                     return;
                 }
-                log.debug("Publishing new image with provenance {}.", 
image.provenance());
+                if (log.isDebugEnabled()) {
+                    log.debug("handleCommit: publishing new image with 
provenance {}.", image.provenance());
+                }
                 for (MetadataPublisher publisher : publishers.values()) {
                     try {
                         publisher.onMetadataUpdate(delta, image, manifest);
@@ -307,8 +345,10 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                                 " with publisher " + publisher.name(), e);
                     }
                 }
-                maybeInitializeNewPublishers();
                 metrics.updateLastAppliedImageProvenance(image.provenance());
+                if (uninitializedPublishers.isEmpty()) {
+                    scheduleInitializeNewPublishers(0);
+                }
             } catch (Throwable e) {
                 // This is a general catch-all block where we don't expect to 
end up;
                 // failure-prone operations should have individual try/catch 
blocks around them.
@@ -379,11 +419,9 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                         setImage(image).
                         build();
                 SnapshotManifest manifest = loadSnapshot(delta, reader);
-                if (log.isDebugEnabled()) {
-                    log.debug("Generated a metadata delta from a snapshot at 
offset {} " +
-                            "in {} us.", 
manifest.provenance().lastContainedOffset(),
-                            NANOSECONDS.toMicros(manifest.elapsedNs()));
-                }
+                log.info("handleSnapshot: generated a metadata delta from a 
snapshot at offset {} " +
+                        "in {} us.", 
manifest.provenance().lastContainedOffset(),
+                        NANOSECONDS.toMicros(manifest.elapsedNs()));
                 try {
                     image = delta.apply(manifest.provenance());
                 } catch (Throwable e) {
@@ -391,10 +429,10 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                             "snapshot at offset " + 
reader.lastContainedLogOffset(), e);
                     return;
                 }
-                if 
(stillNeedToCatchUp(manifest.provenance().lastContainedOffset())) {
+                if (stillNeedToCatchUp("handleSnapshot", 
manifest.provenance().lastContainedOffset())) {
                     return;
                 }
-                log.debug("Publishing new snapshot image with provenance {}.", 
image.provenance());
+                log.info("handleSnapshot: publishing new snapshot image with 
provenance {}.", image.provenance());
                 for (MetadataPublisher publisher : publishers.values()) {
                     try {
                         publisher.onMetadataUpdate(delta, image, manifest);
@@ -404,8 +442,10 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                                     " with publisher " + publisher.name(), e);
                     }
                 }
-                maybeInitializeNewPublishers();
                 metrics.updateLastAppliedImageProvenance(image.provenance());
+                if (uninitializedPublishers.isEmpty()) {
+                    scheduleInitializeNewPublishers(0);
+                }
             } catch (Throwable e) {
                 // This is a general catch-all block where we don't expect to 
end up;
                 // failure-prone operations should have individual try/catch 
blocks around them.
@@ -480,7 +520,29 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
         CompletableFuture<Void> future = new CompletableFuture<>();
         eventQueue.append(() -> {
             try {
-                installNewPublishers(newPublishers);
+                // Check that none of the publishers we are trying to install 
are already present.
+                for (MetadataPublisher newPublisher : newPublishers) {
+                    MetadataPublisher prev = 
publishers.get(newPublisher.name());
+                    if (prev == null) {
+                        prev = 
uninitializedPublishers.get(newPublisher.name());
+                    }
+                    if (prev != null) {
+                        if (prev == newPublisher) {
+                            throw faultHandler.handleFault("Attempted to 
install publisher " +
+                                    newPublisher.name() + ", which is already 
installed.");
+                        } else {
+                            throw faultHandler.handleFault("Attempted to 
install a new publisher " +
+                                    "named " + newPublisher.name() + ", but 
there is already a publisher " +
+                                    "with that name.");
+                        }
+                    }
+                }
+                // After installation, new publishers must be initialized by 
sending them a full
+                // snapshot of the current state. However, we cannot 
necessarily do that immediately,
+                // because the loader itself might not be ready. Therefore, we 
schedule a background
+                // task.
+                newPublishers.forEach(p -> 
uninitializedPublishers.put(p.name(), p));
+                scheduleInitializeNewPublishers(0);
                 future.complete(null);
             } catch (Throwable e) {
                 
future.completeExceptionally(faultHandler.handleFault("Unhandled fault in " +
@@ -490,29 +552,6 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
         return future;
     }
 
-    void installNewPublishers(
-        List<? extends MetadataPublisher> newPublishers
-    ) {
-        // Publishers can't be re-installed if they're already present.
-        for (MetadataPublisher newPublisher : newPublishers) {
-            MetadataPublisher prev = publishers.get(newPublisher.name());
-            if (prev == null) {
-                prev = uninitializedPublishers.get(newPublisher.name());
-            }
-            if (prev != null) {
-                if (prev == newPublisher) {
-                    throw faultHandler.handleFault("Attempted to install 
publisher " +
-                            newPublisher.name() + ", which is already 
installed.");
-                } else {
-                    throw faultHandler.handleFault("Attempted to install a new 
publisher " +
-                            "named " + newPublisher.name() + ", but there is 
already a publisher " +
-                            "with that name.");
-                }
-            }
-            uninitializedPublishers.put(newPublisher.name(), newPublisher);
-        }
-    }
-
     // VisibleForTesting
     void waitForAllEventsToBeHandled() throws Exception {
         CompletableFuture<Void> future = new CompletableFuture<>();
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java 
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index b234d36a708..c7f651cf895 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -42,7 +42,10 @@ import org.junit.jupiter.params.provider.CsvSource;
 import java.util.Iterator;
 import java.util.List;
 import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -71,12 +74,13 @@ public class MetadataLoaderTest {
     }
 
     static class MockPublisher implements MetadataPublisher {
+        final CompletableFuture<Void> firstPublish = new CompletableFuture<>();
         private final String name;
-        MetadataDelta latestDelta = null;
-        MetadataImage latestImage = null;
-        LogDeltaManifest latestLogDeltaManifest = null;
-        SnapshotManifest latestSnapshotManifest = null;
-        boolean closed = false;
+        volatile MetadataDelta latestDelta = null;
+        volatile MetadataImage latestImage = null;
+        volatile LogDeltaManifest latestLogDeltaManifest = null;
+        volatile SnapshotManifest latestSnapshotManifest = null;
+        volatile boolean closed = false;
 
         MockPublisher() {
             this("MockPublisher");
@@ -109,10 +113,12 @@ public class MetadataLoaderTest {
                 default:
                     throw new RuntimeException("Invalid manifest type " + 
manifest.type());
             }
+            firstPublish.complete(null);
         }
 
         @Override
         public void close() throws Exception {
+            firstPublish.completeExceptionally(new 
RejectedExecutionException());
             closed = true;
         }
     }
@@ -262,7 +268,7 @@ public class MetadataLoaderTest {
                 new MockPublisher("c"));
         try (MetadataLoader loader = new MetadataLoader.Builder().
                 setFaultHandler(faultHandler).
-                setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
+                setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
                 build()) {
             loader.installPublishers(publishers.subList(0, 2)).get();
             loader.removeAndClosePublisher(publishers.get(1)).get();
@@ -276,6 +282,7 @@ public class MetadataLoaderTest {
             loader.handleSnapshot(snapshotReader);
             loader.waitForAllEventsToBeHandled();
             assertTrue(snapshotReader.closed);
+            publishers.get(0).firstPublish.get(1, TimeUnit.MINUTES);
             loader.removeAndClosePublisher(publishers.get(0)).get();
         }
         assertTrue(publishers.get(0).closed);
@@ -302,6 +309,7 @@ public class MetadataLoaderTest {
                 setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
                 build()) {
             loader.installPublishers(publishers).get();
+            publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
             loadEmptySnapshot(loader, 200);
             assertEquals(200L, loader.lastAppliedOffset());
             loadEmptySnapshot(loader, 300);
@@ -396,12 +404,13 @@ public class MetadataLoaderTest {
         try (MetadataLoader loader = new MetadataLoader.Builder().
                 setFaultHandler(faultHandler).
                 setTime(time).
-                setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
+                setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
                 build()) {
             loader.installPublishers(publishers).get();
             loadTestSnapshot(loader, 200);
+            publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
             MockBatchReader batchReader = new MockBatchReader(300, asList(
-                Batch.control(300, 100, 4000, 10, 400))).
+                    Batch.control(300, 100, 4000, 10, 400))).
                     setTime(time);
             loader.handleCommit(batchReader);
             loader.waitForAllEventsToBeHandled();
@@ -427,7 +436,7 @@ public class MetadataLoaderTest {
                 new MockPublisher("b"));
         try (MetadataLoader loader = new MetadataLoader.Builder().
                 setFaultHandler(faultHandler).
-                setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
+                setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
                 build()) {
             loader.installPublishers(publishers).get();
             loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
@@ -439,6 +448,9 @@ public class MetadataLoaderTest {
                         setName("foo").
                         setTopicId(Uuid.fromString("Uum7sfhHQP-obSvfywmNUA")), 
(short) 0))
                 )));
+            for (MockPublisher publisher : publishers) {
+                publisher.firstPublish.get(1, TimeUnit.MINUTES);
+            }
             loader.waitForAllEventsToBeHandled();
             assertEquals(200L, loader.lastAppliedOffset());
             loader.handleCommit(new MockBatchReader(201, asList(
@@ -461,6 +473,7 @@ public class MetadataLoaderTest {
      * Test that we do not leave the catchingUp state state until we have 
loaded up to the high
      * water mark.
      */
+    @Test
     public void testCatchingUpState() throws Exception {
         MockFaultHandler faultHandler = new 
MockFaultHandler("testLastAppliedOffset");
         List<MockPublisher> publishers = asList(new MockPublisher("a"),
@@ -476,22 +489,18 @@ public class MetadataLoaderTest {
             // We don't update lastAppliedOffset because we're still in 
catchingUp state due to
             // highWaterMark being OptionalLong.empty (aka unknown).
             assertEquals(-1L, loader.lastAppliedOffset());
+            assertFalse(publishers.get(0).firstPublish.isDone());
 
-            // Setting the high water mark here doesn't do anything because we 
only check it when
-            // we're publishing an update. This is OK because we know that 
we'll get updates
-            // frequently. If there is no other activity, there will at least 
be NoOpRecords.
-            highWaterMark.set(OptionalLong.of(0));
-            assertEquals(-1L, loader.lastAppliedOffset());
-
-            // This still doesn't advance lastAppliedOffset since the high 
water mark at 220
+            // This still doesn't advance lastAppliedOffset since the high 
water mark at 221
             // is greater than our snapshot at 210.
-            highWaterMark.set(OptionalLong.of(220));
+            highWaterMark.set(OptionalLong.of(221));
             loadTestSnapshot(loader, 210);
             assertEquals(-1L, loader.lastAppliedOffset());
 
             // Loading a test snapshot at 220 allows us to leave catchUp state.
             loadTestSnapshot(loader, 220);
             assertEquals(220L, loader.lastAppliedOffset());
+            publishers.get(0).firstPublish.get(1, TimeUnit.MINUTES);
         }
         faultHandler.maybeRethrowFirstException();
     }
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
index 47befabcaba..e03c4eb0bff 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -46,7 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 public class SnapshotGeneratorTest {
     static class MockEmitter implements SnapshotGenerator.Emitter {
         private final CountDownLatch latch = new CountDownLatch(1);
-        private final List<MetadataImage> images = new ArrayList<>();
+        private final List<MetadataImage> images = new 
CopyOnWriteArrayList<>();
         private RuntimeException problem = null;
 
         MockEmitter setReady() {
@@ -66,14 +67,14 @@ public class SnapshotGeneratorTest {
                 throw currentProblem;
             }
             try {
-                latch.await();
+                latch.await(30, TimeUnit.SECONDS);
             } catch (Throwable e) {
                 throw new RuntimeException(e);
             }
             images.add(image);
         }
 
-        synchronized List<MetadataImage> images() {
+        List<MetadataImage> images() {
             return new ArrayList<>(images);
         }
     }
diff --git 
a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java 
b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
index 4e49a9a9153..70f859d9f89 100644
--- a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
+++ b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
@@ -460,6 +460,10 @@ public final class KafkaEventQueue implements EventQueue {
         this.eventHandlerThread.start();
     }
 
+    public Time time() {
+        return time;
+    }
+
     @Override
     public void enqueue(EventInsertionType insertionType,
                         String tag,

Reply via email to