This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 09e59bc7761 KAFKA-14857: Fix some MetadataLoader bugs (#13462)
09e59bc7761 is described below
commit 09e59bc7761a6b9ec1437b3decdfcd7b5fff868e
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Wed Mar 29 12:30:12 2023 -0700
KAFKA-14857: Fix some MetadataLoader bugs (#13462)
The MetadataLoader is not supposed to publish metadata updates until we
have loaded up to the high
water mark. Previously, this logic was broken, and we published updates
immediately. This PR fixes
that and adds a junit test.
Another issue is that the MetadataLoader previously assumed that we would
periodically get
callbacks from the Raft layer even if nothing had happened. We relied on
this to install new
publishers in a timely fashion, for example. However, in older
MetadataVersions that don't include
NoOpRecord, this is not a safe assumption.
Aside from the above changes, also fix a deadlock in SnapshotGeneratorTest,
fix the log prefix for
BrokerLifecycleManager, and remove metadata publishers on brokerserver
shutdown (like we do for
controllers).
Reviewers: David Arthur <[email protected]>, dengziming
<[email protected]>
---
.../kafka/server/BrokerLifecycleManager.scala | 2 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 11 +-
.../kafka/server/KRaftClusterTest.scala | 20 +++
.../apache/kafka/image/loader/MetadataLoader.java | 153 +++++++++++++--------
.../kafka/image/loader/MetadataLoaderTest.java | 43 +++---
.../image/publisher/SnapshotGeneratorTest.java | 7 +-
.../org/apache/kafka/queue/KafkaEventQueue.java | 4 +
7 files changed, 157 insertions(+), 83 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index 346044c39e6..9d4682182a7 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -64,7 +64,7 @@ class BrokerLifecycleManager(
if (isZkBroker) {
builder.append(" isZkBroker=true")
}
- builder.append("]")
+ builder.append("] ")
builder.toString()
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 191e3c45f02..e3a657cd982 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -138,6 +138,8 @@ class BrokerServer(
def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
+ val metadataPublishers: util.List[MetadataPublisher] = new
util.ArrayList[MetadataPublisher]()
+
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus):
Boolean = {
lock.lock()
try {
@@ -406,7 +408,6 @@ class BrokerServer(
config.numIoThreads,
s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
DataPlaneAcceptor.ThreadPrefix)
- val publishers = new util.ArrayList[MetadataPublisher]()
brokerMetadataPublisher = new BrokerMetadataPublisher(config,
metadataCache,
logManager,
@@ -431,7 +432,7 @@ class BrokerServer(
authorizer,
sharedServer.initialBrokerMetadataLoadFaultHandler,
sharedServer.metadataPublishingFaultHandler)
- publishers.add(brokerMetadataPublisher)
+ metadataPublishers.add(brokerMetadataPublisher)
// Register parts of the broker that can be reconfigured via dynamic
configs. This needs to
// be done before we publish the dynamic configs, so that we don't miss
anything.
@@ -440,7 +441,7 @@ class BrokerServer(
// Install all the metadata publishers.
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"the broker metadata publishers to be installed",
- sharedServer.loader.installPublishers(publishers), startupDeadline,
time)
+ sharedServer.loader.installPublishers(metadataPublishers),
startupDeadline, time)
// Wait for this broker to contact the quorum, and for the active
controller to acknowledge
// us as caught up. It will do this by returning a heartbeat response
with isCaughtUp set to
@@ -537,14 +538,14 @@ class BrokerServer(
error("Got unexpected exception waiting for controlled shutdown
future", e)
}
}
-
lifecycleManager.beginShutdown()
-
// Stop socket server to stop accepting any more connections and
requests.
// Socket server will be shutdown towards the end of the sequence.
if (socketServer != null) {
CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
}
+ metadataPublishers.forEach(p =>
sharedServer.loader.removeAndClosePublisher(p).get())
+ metadataPublishers.clear()
if (dataPlaneRequestHandlerPool != null)
CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
if (dataPlaneRequestProcessor != null)
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 34afeb7fc49..084ff43fcf1 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -974,6 +974,26 @@ class KRaftClusterTest {
}
}
+ /**
+ * Test a single broker, single controller cluster at the minimum bootstrap
level. This tests
+ * that we can function without having periodic NoOpRecords written.
+ */
+ @Test
+ def testSingleControllerSingleBrokerCluster(): Unit = {
+ val cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setBootstrapMetadataVersion(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION).
+ setNumBrokerNodes(1).
+ setNumControllerNodes(1).build()).build()
+ try {
+ cluster.format()
+ cluster.startup()
+ cluster.waitForReadyBrokers()
+ } finally {
+ cluster.close()
+ }
+ }
+
@ParameterizedTest
@ValueSource(booleans = Array(false, true))
def testReconfigureControllerClientQuotas(combinedController: Boolean): Unit
= {
diff --git
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index ac34d1fa4a5..839ea9f3e93 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -42,6 +42,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -140,6 +141,8 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
}
}
+ private static final String INITIALIZE_NEW_PUBLISHERS =
"InitializeNewPublishers";
+
/**
* The log4j logger for this loader.
*/
@@ -166,7 +169,7 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
private final Supplier<OptionalLong> highWaterMarkAccessor;
/**
- * Publishers which haven't been initialized yet.
+ * Publishers which haven't received any metadata yet.
*/
private final LinkedHashMap<String, MetadataPublisher>
uninitializedPublishers;
@@ -176,9 +179,10 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
private final LinkedHashMap<String, MetadataPublisher> publishers;
/**
- * True if we have caught up with the initial high water mark.
+ * True if we have not caught up with the initial high water mark.
+ * We do not send out any metadata updates until this is true.
*/
- private boolean catchingUp = false;
+ private boolean catchingUp = true;
/**
* The current leader and epoch.
@@ -212,38 +216,67 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
this.uninitializedPublishers = new LinkedHashMap<>();
this.publishers = new LinkedHashMap<>();
this.image = MetadataImage.EMPTY;
- this.eventQueue = new KafkaEventQueue(time, logContext,
+ this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext,
threadNamePrefix + "metadata-loader-",
new ShutdownEvent());
}
- private boolean stillNeedToCatchUp(long offset) {
+ private boolean stillNeedToCatchUp(String where, long offset) {
if (!catchingUp) {
- log.trace("We are not in the initial catching up state.");
+ log.trace("{}: we are not in the initial catching up state.",
where);
return false;
}
OptionalLong highWaterMark = highWaterMarkAccessor.get();
if (!highWaterMark.isPresent()) {
- log.info("The loader is still catching up because we still don't
know the high " +
- "water mark yet.");
+ log.info("{}: the loader is still catching up because we still
don't know the high " +
+ "water mark yet.", where);
return true;
}
- if (highWaterMark.getAsLong() > offset) {
- log.info("The loader is still catching up because we have loaded
up to offset " +
- offset + ", but the high water mark is " +
highWaterMark.getAsLong());
+ if (highWaterMark.getAsLong() - 1 > offset) {
+ log.info("{}: The loader is still catching up because we have
loaded up to offset " +
+ offset + ", but the high water mark is {}", where,
highWaterMark.getAsLong());
return true;
}
- log.info("The loader finished catch up to the current high water mark
of " +
- highWaterMark.getAsLong());
- catchingUp = true;
+ log.info("{}: The loader finished catching up to the current high
water mark of {}",
+ where, highWaterMark.getAsLong());
+ catchingUp = false;
return false;
}
- private void maybeInitializeNewPublishers() {
+ /**
+ * Schedule an event to initialize the new publishers that are present in
the system.
+ *
+ * @param delayNs The minimum time in nanoseconds we should wait. If
there is already an
+ * initialization event scheduled, we will either move
its deadline forward
+ * in time or leave it unchanged.
+ */
+ void scheduleInitializeNewPublishers(long delayNs) {
+ eventQueue.scheduleDeferred(INITIALIZE_NEW_PUBLISHERS,
+ new
EventQueue.EarliestDeadlineFunction(eventQueue.time().nanoseconds() + delayNs),
+ () -> {
+ try {
+ initializeNewPublishers();
+ } catch (Throwable e) {
+ faultHandler.handleFault("Unhandled error initializing new
publishers", e);
+ }
+ });
+ }
+
+ void initializeNewPublishers() {
if (uninitializedPublishers.isEmpty()) {
- log.trace("There are no uninitialized publishers to initialize.");
+ log.debug("InitializeNewPublishers: nothing to do.");
+ return;
+ }
+ if (stillNeedToCatchUp("initializeNewPublishers",
image.highestOffsetAndEpoch().offset())) {
+ // Reschedule the initialization for later.
+ log.debug("InitializeNewPublishers: unable to initialize new
publisher(s) {} " +
+ "because we are still catching up with quorum
metadata. Rescheduling.",
+ uninitializedPublisherNames());
+
scheduleInitializeNewPublishers(TimeUnit.MILLISECONDS.toNanos(100));
return;
}
+ log.debug("InitializeNewPublishers: setting up snapshot image for new
publisher(s): {}",
+ uninitializedPublisherNames());
long startNs = time.nanoseconds();
MetadataDelta delta = new MetadataDelta.Builder().
setImage(image).
@@ -260,19 +293,22 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
MetadataPublisher publisher = iter.next();
iter.remove();
try {
- log.info("Publishing initial snapshot at offset {} to {}",
- image.highestOffsetAndEpoch().offset(),
publisher.name());
+ log.info("InitializeNewPublishers: initializing {} with a
snapshot at offset {}",
+ publisher.name(),
image.highestOffsetAndEpoch().offset());
publisher.onMetadataUpdate(delta, image, manifest);
publisher.onControllerChange(currentLeaderAndEpoch);
publishers.put(publisher.name(), publisher);
} catch (Throwable e) {
- faultHandler.handleFault("Unhandled error publishing the
initial metadata " +
- "image from snapshot at offset " +
image.highestOffsetAndEpoch().offset() +
- " with publisher " + publisher.name(), e);
+ faultHandler.handleFault("Unhandled error initializing " +
publisher.name() +
+ " with a snapshot at offset " +
image.highestOffsetAndEpoch().offset(), e);
}
}
}
+ private String uninitializedPublisherNames() {
+ return String.join(", ", uninitializedPublishers.keySet());
+ }
+
@Override
public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
eventQueue.append(() -> {
@@ -282,7 +318,7 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
build();
LogDeltaManifest manifest = loadLogDelta(delta, reader);
if (log.isDebugEnabled()) {
- log.debug("Generated a metadata delta between {} and {}
from {} batch(es) " +
+ log.debug("handleCommit: Generated a metadata delta
between {} and {} from {} batch(es) " +
"in {} us.", image.offset(),
manifest.provenance().lastContainedOffset(),
manifest.numBatches(),
NANOSECONDS.toMicros(manifest.elapsedNs()));
}
@@ -294,10 +330,12 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
" and " +
manifest.provenance().lastContainedOffset(), e);
return;
}
- if
(stillNeedToCatchUp(manifest.provenance().lastContainedOffset())) {
+ if (stillNeedToCatchUp("handleCommit",
manifest.provenance().lastContainedOffset())) {
return;
}
- log.debug("Publishing new image with provenance {}.",
image.provenance());
+ if (log.isDebugEnabled()) {
+ log.debug("handleCommit: publishing new image with
provenance {}.", image.provenance());
+ }
for (MetadataPublisher publisher : publishers.values()) {
try {
publisher.onMetadataUpdate(delta, image, manifest);
@@ -307,8 +345,10 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
" with publisher " + publisher.name(), e);
}
}
- maybeInitializeNewPublishers();
metrics.updateLastAppliedImageProvenance(image.provenance());
+ if (uninitializedPublishers.isEmpty()) {
+ scheduleInitializeNewPublishers(0);
+ }
} catch (Throwable e) {
// This is a general catch-all block where we don't expect to
end up;
// failure-prone operations should have individual try/catch
blocks around them.
@@ -379,11 +419,9 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
setImage(image).
build();
SnapshotManifest manifest = loadSnapshot(delta, reader);
- if (log.isDebugEnabled()) {
- log.debug("Generated a metadata delta from a snapshot at
offset {} " +
- "in {} us.",
manifest.provenance().lastContainedOffset(),
- NANOSECONDS.toMicros(manifest.elapsedNs()));
- }
+ log.info("handleSnapshot: generated a metadata delta from a
snapshot at offset {} " +
+ "in {} us.",
manifest.provenance().lastContainedOffset(),
+ NANOSECONDS.toMicros(manifest.elapsedNs()));
try {
image = delta.apply(manifest.provenance());
} catch (Throwable e) {
@@ -391,10 +429,10 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
"snapshot at offset " +
reader.lastContainedLogOffset(), e);
return;
}
- if
(stillNeedToCatchUp(manifest.provenance().lastContainedOffset())) {
+ if (stillNeedToCatchUp("handleSnapshot",
manifest.provenance().lastContainedOffset())) {
return;
}
- log.debug("Publishing new snapshot image with provenance {}.",
image.provenance());
+ log.info("handleSnapshot: publishing new snapshot image with
provenance {}.", image.provenance());
for (MetadataPublisher publisher : publishers.values()) {
try {
publisher.onMetadataUpdate(delta, image, manifest);
@@ -404,8 +442,10 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
" with publisher " + publisher.name(), e);
}
}
- maybeInitializeNewPublishers();
metrics.updateLastAppliedImageProvenance(image.provenance());
+ if (uninitializedPublishers.isEmpty()) {
+ scheduleInitializeNewPublishers(0);
+ }
} catch (Throwable e) {
// This is a general catch-all block where we don't expect to
end up;
// failure-prone operations should have individual try/catch
blocks around them.
@@ -480,7 +520,29 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
CompletableFuture<Void> future = new CompletableFuture<>();
eventQueue.append(() -> {
try {
- installNewPublishers(newPublishers);
+ // Check that none of the publishers we are trying to install
are already present.
+ for (MetadataPublisher newPublisher : newPublishers) {
+ MetadataPublisher prev =
publishers.get(newPublisher.name());
+ if (prev == null) {
+ prev =
uninitializedPublishers.get(newPublisher.name());
+ }
+ if (prev != null) {
+ if (prev == newPublisher) {
+ throw faultHandler.handleFault("Attempted to
install publisher " +
+ newPublisher.name() + ", which is already
installed.");
+ } else {
+ throw faultHandler.handleFault("Attempted to
install a new publisher " +
+ "named " + newPublisher.name() + ", but
there is already a publisher " +
+ "with that name.");
+ }
+ }
+ }
+ // After installation, new publishers must be initialized by
sending them a full
+ // snapshot of the current state. However, we cannot
necessarily do that immediately,
+ // because the loader itself might not be ready. Therefore, we
schedule a background
+ // task.
+ newPublishers.forEach(p ->
uninitializedPublishers.put(p.name(), p));
+ scheduleInitializeNewPublishers(0);
future.complete(null);
} catch (Throwable e) {
future.completeExceptionally(faultHandler.handleFault("Unhandled fault in " +
@@ -490,29 +552,6 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
return future;
}
- void installNewPublishers(
- List<? extends MetadataPublisher> newPublishers
- ) {
- // Publishers can't be re-installed if they're already present.
- for (MetadataPublisher newPublisher : newPublishers) {
- MetadataPublisher prev = publishers.get(newPublisher.name());
- if (prev == null) {
- prev = uninitializedPublishers.get(newPublisher.name());
- }
- if (prev != null) {
- if (prev == newPublisher) {
- throw faultHandler.handleFault("Attempted to install
publisher " +
- newPublisher.name() + ", which is already
installed.");
- } else {
- throw faultHandler.handleFault("Attempted to install a new
publisher " +
- "named " + newPublisher.name() + ", but there is
already a publisher " +
- "with that name.");
- }
- }
- uninitializedPublishers.put(newPublisher.name(), newPublisher);
- }
- }
-
// VisibleForTesting
void waitForAllEventsToBeHandled() throws Exception {
CompletableFuture<Void> future = new CompletableFuture<>();
diff --git
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index b234d36a708..c7f651cf895 100644
---
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -42,7 +42,10 @@ import org.junit.jupiter.params.provider.CsvSource;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -71,12 +74,13 @@ public class MetadataLoaderTest {
}
static class MockPublisher implements MetadataPublisher {
+ final CompletableFuture<Void> firstPublish = new CompletableFuture<>();
private final String name;
- MetadataDelta latestDelta = null;
- MetadataImage latestImage = null;
- LogDeltaManifest latestLogDeltaManifest = null;
- SnapshotManifest latestSnapshotManifest = null;
- boolean closed = false;
+ volatile MetadataDelta latestDelta = null;
+ volatile MetadataImage latestImage = null;
+ volatile LogDeltaManifest latestLogDeltaManifest = null;
+ volatile SnapshotManifest latestSnapshotManifest = null;
+ volatile boolean closed = false;
MockPublisher() {
this("MockPublisher");
@@ -109,10 +113,12 @@ public class MetadataLoaderTest {
default:
throw new RuntimeException("Invalid manifest type " +
manifest.type());
}
+ firstPublish.complete(null);
}
@Override
public void close() throws Exception {
+ firstPublish.completeExceptionally(new
RejectedExecutionException());
closed = true;
}
}
@@ -262,7 +268,7 @@ public class MetadataLoaderTest {
new MockPublisher("c"));
try (MetadataLoader loader = new MetadataLoader.Builder().
setFaultHandler(faultHandler).
- setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
+ setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
build()) {
loader.installPublishers(publishers.subList(0, 2)).get();
loader.removeAndClosePublisher(publishers.get(1)).get();
@@ -276,6 +282,7 @@ public class MetadataLoaderTest {
loader.handleSnapshot(snapshotReader);
loader.waitForAllEventsToBeHandled();
assertTrue(snapshotReader.closed);
+ publishers.get(0).firstPublish.get(1, TimeUnit.MINUTES);
loader.removeAndClosePublisher(publishers.get(0)).get();
}
assertTrue(publishers.get(0).closed);
@@ -302,6 +309,7 @@ public class MetadataLoaderTest {
setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
build()) {
loader.installPublishers(publishers).get();
+ publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
loadEmptySnapshot(loader, 200);
assertEquals(200L, loader.lastAppliedOffset());
loadEmptySnapshot(loader, 300);
@@ -396,12 +404,13 @@ public class MetadataLoaderTest {
try (MetadataLoader loader = new MetadataLoader.Builder().
setFaultHandler(faultHandler).
setTime(time).
- setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
+ setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
build()) {
loader.installPublishers(publishers).get();
loadTestSnapshot(loader, 200);
+ publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
MockBatchReader batchReader = new MockBatchReader(300, asList(
- Batch.control(300, 100, 4000, 10, 400))).
+ Batch.control(300, 100, 4000, 10, 400))).
setTime(time);
loader.handleCommit(batchReader);
loader.waitForAllEventsToBeHandled();
@@ -427,7 +436,7 @@ public class MetadataLoaderTest {
new MockPublisher("b"));
try (MetadataLoader loader = new MetadataLoader.Builder().
setFaultHandler(faultHandler).
- setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
+ setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
build()) {
loader.installPublishers(publishers).get();
loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
@@ -439,6 +448,9 @@ public class MetadataLoaderTest {
setName("foo").
setTopicId(Uuid.fromString("Uum7sfhHQP-obSvfywmNUA")),
(short) 0))
)));
+ for (MockPublisher publisher : publishers) {
+ publisher.firstPublish.get(1, TimeUnit.MINUTES);
+ }
loader.waitForAllEventsToBeHandled();
assertEquals(200L, loader.lastAppliedOffset());
loader.handleCommit(new MockBatchReader(201, asList(
@@ -461,6 +473,7 @@ public class MetadataLoaderTest {
* Test that we do not leave the catchingUp state state until we have
loaded up to the high
* water mark.
*/
+ @Test
public void testCatchingUpState() throws Exception {
MockFaultHandler faultHandler = new
MockFaultHandler("testLastAppliedOffset");
List<MockPublisher> publishers = asList(new MockPublisher("a"),
@@ -476,22 +489,18 @@ public class MetadataLoaderTest {
// We don't update lastAppliedOffset because we're still in
catchingUp state due to
// highWaterMark being OptionalLong.empty (aka unknown).
assertEquals(-1L, loader.lastAppliedOffset());
+ assertFalse(publishers.get(0).firstPublish.isDone());
- // Setting the high water mark here doesn't do anything because we
only check it when
- // we're publishing an update. This is OK because we know that
we'll get updates
- // frequently. If there is no other activity, there will at least
be NoOpRecords.
- highWaterMark.set(OptionalLong.of(0));
- assertEquals(-1L, loader.lastAppliedOffset());
-
- // This still doesn't advance lastAppliedOffset since the high
water mark at 220
+ // This still doesn't advance lastAppliedOffset since the high
water mark at 221
// is greater than our snapshot at 210.
- highWaterMark.set(OptionalLong.of(220));
+ highWaterMark.set(OptionalLong.of(221));
loadTestSnapshot(loader, 210);
assertEquals(-1L, loader.lastAppliedOffset());
// Loading a test snapshot at 220 allows us to leave catchUp state.
loadTestSnapshot(loader, 220);
assertEquals(220L, loader.lastAppliedOffset());
+ publishers.get(0).firstPublish.get(1, TimeUnit.MINUTES);
}
faultHandler.maybeRethrowFirstException();
}
diff --git
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
index 47befabcaba..e03c4eb0bff 100644
---
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -46,7 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
public class SnapshotGeneratorTest {
static class MockEmitter implements SnapshotGenerator.Emitter {
private final CountDownLatch latch = new CountDownLatch(1);
- private final List<MetadataImage> images = new ArrayList<>();
+ private final List<MetadataImage> images = new
CopyOnWriteArrayList<>();
private RuntimeException problem = null;
MockEmitter setReady() {
@@ -66,14 +67,14 @@ public class SnapshotGeneratorTest {
throw currentProblem;
}
try {
- latch.await();
+ latch.await(30, TimeUnit.SECONDS);
} catch (Throwable e) {
throw new RuntimeException(e);
}
images.add(image);
}
- synchronized List<MetadataImage> images() {
+ List<MetadataImage> images() {
return new ArrayList<>(images);
}
}
diff --git
a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
index 4e49a9a9153..70f859d9f89 100644
--- a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
+++ b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
@@ -460,6 +460,10 @@ public final class KafkaEventQueue implements EventQueue {
this.eventHandlerThread.start();
}
+ public Time time() {
+ return time;
+ }
+
@Override
public void enqueue(EventInsertionType insertionType,
String tag,