This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 843ede494f4 KAFKA-15389: Don't publish until we have replayed at least
one record (#14282)
843ede494f4 is described below
commit 843ede494f4ea0d6869f8410622d71cc648c5358
Author: David Arthur <[email protected]>
AuthorDate: Fri Aug 25 13:41:43 2023 -0400
KAFKA-15389: Don't publish until we have replayed at least one record
(#14282)
When starting up a controller for the first time (i.e., with an empty log),
it is possible for
MetadataLoader to publish an empty MetadataImage before the activation
records of the controller
have been written. While this is not a bug, it could be confusing. This
patch closes that gap by
waiting for at least one controller record to be committed before the
MetadataLoader starts publishing
images.
Reviewers: Colin P. McCabe <[email protected]>
---
.../kafka/image/loader/MetadataBatchLoader.java | 15 +++++-
.../apache/kafka/image/loader/MetadataLoader.java | 8 +++-
.../kafka/image/loader/MetadataLoaderTest.java | 53 +++++++++++++++++++++-
3 files changed, 72 insertions(+), 4 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
index 33f4846ac70..b1d22364cc0 100644
---
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
+++
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
@@ -67,6 +67,7 @@ public class MetadataBatchLoader {
private int numBatches;
private long totalBatchElapsedNs;
private TransactionState transactionState;
+ private boolean hasSeenRecord;
public MetadataBatchLoader(
LogContext logContext,
@@ -78,16 +79,27 @@ public class MetadataBatchLoader {
this.time = time;
this.faultHandler = faultHandler;
this.callback = callback;
+ this.resetToImage(MetadataImage.EMPTY);
+ this.hasSeenRecord = false;
+ }
+
+ /**
+ * @return True if this batch loader has seen at least one record.
+ */
+ public boolean hasSeenRecord() {
+ return hasSeenRecord;
}
/**
* Reset the state of this batch loader to the given image. Any un-flushed
state will be
- * discarded.
+ * discarded. This is called after applying a delta and passing it back to
MetadataLoader, or
+ * when MetadataLoader loads a snapshot.
*
* @param image Metadata image to reset this batch loader's state to.
*/
public void resetToImage(MetadataImage image) {
this.image = image;
+ this.hasSeenRecord = true;
this.delta = new MetadataDelta.Builder().setImage(image).build();
this.transactionState = TransactionState.NO_TRANSACTION;
this.lastOffset = image.provenance().lastContainedOffset();
@@ -241,6 +253,7 @@ public class MetadataBatchLoader {
default:
break;
}
+ hasSeenRecord = true;
delta.replay(record.message());
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index c5ba2af0b50..307fda8d218 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -212,7 +212,6 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
time,
faultHandler,
this::maybePublishMetadata);
- this.batchLoader.resetToImage(this.image);
this.eventQueue = new KafkaEventQueue(
Time.SYSTEM,
logContext,
@@ -241,6 +240,11 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
offset + ", but the high water mark is {}", where,
highWaterMark.getAsLong());
return true;
}
+ if (!batchLoader.hasSeenRecord()) {
+ log.info("{}: The loader is still catching up because we have not
loaded a controller record as of offset " +
+ offset + " and high water mark is {}", where,
highWaterMark.getAsLong());
+ return true;
+ }
log.info("{}: The loader finished catching up to the current high
water mark of {}",
where, highWaterMark.getAsLong());
catchingUp = false;
@@ -387,8 +391,8 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
image.provenance().lastContainedOffset(),
NANOSECONDS.toMicros(manifest.elapsedNs()));
MetadataImage image = delta.apply(manifest.provenance());
- maybePublishMetadata(delta, image, manifest);
batchLoader.resetToImage(image);
+ maybePublishMetadata(delta, image, manifest);
} catch (Throwable e) {
// This is a general catch-all block where we don't expect to
end up;
// failure-prone operations should have individual try/catch
blocks around them.
diff --git
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index 199d907a447..62d974b8b5c 100644
---
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -18,9 +18,11 @@
package org.apache.kafka.image.loader;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
@@ -48,6 +50,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -338,8 +341,8 @@ public class MetadataLoaderTest {
setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
build()) {
loader.installPublishers(publishers).get();
- publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
loadEmptySnapshot(loader, 200);
+ publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
assertEquals(200L, loader.lastAppliedOffset());
loadEmptySnapshot(loader, 300);
assertEquals(300L, loader.lastAppliedOffset());
@@ -668,6 +671,7 @@ public class MetadataLoaderTest {
.setTopicId(Uuid.fromString("dMCqhcK4T5miGH5wEX7NsQ")), (short) 0)
)));
loader.waitForAllEventsToBeHandled();
+ publisher.firstPublish.get(30, TimeUnit.SECONDS);
assertNull(publisher.latestImage.topics().getTopic("foo"),
"Topic should not be visible since we started transaction");
@@ -732,6 +736,7 @@ public class MetadataLoaderTest {
// After MetadataLoader is fixed to handle arbitrary transactions,
we would expect "foo"
// to be visible at this point.
+ publisher.firstPublish.get(30, TimeUnit.SECONDS);
assertNotNull(publisher.latestImage.topics().getTopic("foo"));
}
faultHandler.maybeRethrowFirstException();
@@ -758,6 +763,7 @@ public class MetadataLoaderTest {
.setTopicId(Uuid.fromString("HQSM3ccPQISrHqYK_C8GpA")), (short) 0)
)));
loader.waitForAllEventsToBeHandled();
+ publisher.firstPublish.get(30, TimeUnit.SECONDS);
assertNull(publisher.latestImage.topics().getTopic("foo"));
// loading a snapshot discards any in-flight transaction
@@ -773,4 +779,49 @@ public class MetadataLoaderTest {
}
faultHandler.maybeRethrowFirstException();
}
+
+ @Test
+ public void testNoPublishEmptyImage() throws Exception {
+ MockFaultHandler faultHandler = new
MockFaultHandler("testNoPublishEmptyImage");
+ List<MetadataImage> capturedImages = new ArrayList<>();
+ CompletableFuture<Void> firstPublish = new CompletableFuture<>();
+ MetadataPublisher capturingPublisher = new MetadataPublisher() {
+ @Override
+ public String name() {
+ return "testNoPublishEmptyImage";
+ }
+
+ @Override
+ public void onMetadataUpdate(MetadataDelta delta, MetadataImage
newImage, LoaderManifest manifest) {
+ if (!firstPublish.isDone()) {
+ firstPublish.complete(null);
+ }
+ capturedImages.add(newImage);
+ }
+ };
+
+ try (MetadataLoader loader = new MetadataLoader.Builder().
+ setFaultHandler(faultHandler).
+ setHighWaterMarkAccessor(() -> OptionalLong.of(1)).
+ build()) {
+
loader.installPublishers(Collections.singletonList(capturingPublisher)).get();
+ loader.handleCommit(
+ MockBatchReader.newSingleBatchReader(0, 1,
Collections.singletonList(
+ // Any record will work here
+ new ApiMessageAndVersion(new ConfigRecord()
+ .setResourceType(ConfigResource.Type.BROKER.id())
+ .setResourceName("3000")
+ .setName("foo")
+ .setValue("bar"), (short) 0)
+ )));
+ firstPublish.get(30, TimeUnit.SECONDS);
+
+ assertFalse(capturedImages.isEmpty());
+ capturedImages.forEach(metadataImage -> {
+ assertFalse(metadataImage.isEmpty());
+ });
+
+ }
+ faultHandler.maybeRethrowFirstException();
+ }
}