This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new 6c89a3f3655 KAFKA-14894: MetadataLoader must call finishSnapshot after
loading a snapshot (#13541)
6c89a3f3655 is described below
commit 6c89a3f3655cbf310cefc7c5aff5f330f9cde69d
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Tue Apr 11 15:02:33 2023 -0700
KAFKA-14894: MetadataLoader must call finishSnapshot after loading a
snapshot (#13541)
The MetadataLoader must call finishSnapshot after loading a snapshot. This
function removes
whatever was in the old snapshot that is not in the new snapshot that was
just loaded. While this
is not significant when the old snapshot was the empty snapshot, it is
important to do when we are
loading a snapshot on top of an existing non-empty image.
In initializeNewPublishers, the newly installed publishers should be given
a MetadataDelta based on
MetadataImage.EMPTY, reflecting the fact that they are seeing everything
for the first time.
Reviewers: David Arthur <[email protected]>
---
.../apache/kafka/image/loader/MetadataLoader.java | 6 ++-
.../apache/kafka/image/writer/ImageReWriter.java | 1 +
.../kafka/image/loader/MetadataLoaderTest.java | 48 ++++++++++++++++++++++
.../kafka/image/writer/ImageReWriterTest.java | 30 ++++++++++++++
4 files changed, 84 insertions(+), 1 deletion(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index 4b820fbcdfb..4b8d564971a 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -274,13 +274,16 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
log.debug("InitializeNewPublishers: setting up snapshot image for new
publisher(s): {}",
uninitializedPublisherNames());
long startNs = time.nanoseconds();
+ // We base this delta off of the empty image, reflecting the fact that
these publishers
+ // haven't seen anything previously.
MetadataDelta delta = new MetadataDelta.Builder().
- setImage(image).
+ setImage(MetadataImage.EMPTY).
build();
ImageReWriter writer = new ImageReWriter(delta);
image.write(writer, new ImageWriterOptions.Builder().
setMetadataVersion(image.features().metadataVersion()).
build());
+ // ImageReWriter#close invokes finishSnapshot, so we don't need to
invoke it here.
SnapshotManifest manifest = new SnapshotManifest(
image.provenance(),
time.nanoseconds() - startNs);
@@ -477,6 +480,7 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
snapshotIndex++;
}
}
+ delta.finishSnapshot();
MetadataProvenance provenance = new
MetadataProvenance(reader.lastContainedLogOffset(),
reader.lastContainedLogEpoch(),
reader.lastContainedLogTimestamp());
return new SnapshotManifest(provenance,
diff --git
a/metadata/src/main/java/org/apache/kafka/image/writer/ImageReWriter.java
b/metadata/src/main/java/org/apache/kafka/image/writer/ImageReWriter.java
index 42a0aaa93a1..3a1245000e8 100644
--- a/metadata/src/main/java/org/apache/kafka/image/writer/ImageReWriter.java
+++ b/metadata/src/main/java/org/apache/kafka/image/writer/ImageReWriter.java
@@ -49,6 +49,7 @@ public class ImageReWriter implements ImageWriter {
if (closed) return;
closed = true;
if (complete) {
+ delta.finishSnapshot();
image = delta.apply(delta.image().provenance());
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index 489a9bca80e..8e9d2ed12db 100644
---
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -523,4 +523,52 @@ public class MetadataLoaderTest {
)));
loader.waitForAllEventsToBeHandled();
}
+
+ private void loadTestSnapshot2(
+ MetadataLoader loader,
+ long offset
+ ) throws Exception {
+ loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
+ new MetadataProvenance(offset, 100, 4000), asList(
+ asList(new ApiMessageAndVersion(new
FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(IBP_3_3_IV2.featureLevel()),
(short) 0)),
+ asList(new ApiMessageAndVersion(new TopicRecord().
+ setName("bar").
+
setTopicId(Uuid.fromString("VcL2Mw-cT4aL6XV9VujzoQ")), (short) 0))
+ )));
+ loader.waitForAllEventsToBeHandled();
+ }
+
+ /**
+ * Test that loading a snapshot clears the previous state.
+ */
+ @Test
+ public void testReloadSnapshot() throws Exception {
+ MockFaultHandler faultHandler = new
MockFaultHandler("testLastAppliedOffset");
+ List<MockPublisher> publishers = asList(new MockPublisher("a"));
+ try (MetadataLoader loader = new MetadataLoader.Builder().
+ setFaultHandler(faultHandler).
+ setHighWaterMarkAccessor(() -> OptionalLong.of(0)).
+ build()) {
+ loadTestSnapshot(loader, 100);
+ loader.installPublishers(publishers).get();
+ loader.waitForAllEventsToBeHandled();
+ assertTrue(publishers.get(0).firstPublish.isDone());
+ assertTrue(publishers.get(0).latestDelta.image().isEmpty());
+ assertEquals(100L,
publishers.get(0).latestImage.provenance().lastContainedOffset());
+
+ loadTestSnapshot(loader, 200);
+ assertEquals(200L, loader.lastAppliedOffset());
+ assertFalse(publishers.get(0).latestDelta.image().isEmpty());
+
+ loadTestSnapshot2(loader, 400);
+ assertEquals(400L, loader.lastAppliedOffset());
+
+ // Make sure the topic in the initial snapshot was overwritten by
loading the new snapshot.
+
assertFalse(publishers.get(0).latestImage.topics().topicsByName().containsKey("foo"));
+
assertTrue(publishers.get(0).latestImage.topics().topicsByName().containsKey("bar"));
+ }
+ faultHandler.maybeRethrowFirstException();
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/image/writer/ImageReWriterTest.java
b/metadata/src/test/java/org/apache/kafka/image/writer/ImageReWriterTest.java
index 640924fe076..fd81cc98dcc 100644
---
a/metadata/src/test/java/org/apache/kafka/image/writer/ImageReWriterTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/writer/ImageReWriterTest.java
@@ -18,11 +18,17 @@
package org.apache.kafka.image.writer;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import java.util.Collections;
+
+import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.metadata.RecordTestUtils.testRecord;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -60,4 +66,28 @@ public class ImageReWriterTest {
setName("foo").
setTopicId(Uuid.fromString("3B134hrsQgKtz8Sp6QBIfg"))));
}
+
+ @Test
+ public void testCloseInvokesFinishSnapshot() {
+ MetadataDelta delta = new MetadataDelta.Builder().build();
+ ImageReWriter writer = new ImageReWriter(delta);
+ writer.write(0, new TopicRecord().
+ setName("foo").
+ setTopicId(Uuid.fromString("3B134hrsQgKtz8Sp6QBIfg")));
+ writer.close(true);
+
+ MetadataDelta delta2 = new
MetadataDelta.Builder().setImage(writer.image()).build();
+ ImageReWriter writer2 = new ImageReWriter(delta2);
+ writer2.write(0, new ConfigRecord().
+ setResourceName("").
+ setResourceType(BROKER.id()).
+ setName("num.io.threads").
+ setValue("12"));
+ writer2.close(true);
+ MetadataImage newImage = writer2.image();
+
+ assertEquals(Collections.emptyMap(), newImage.topics().topicsById());
+ assertEquals(Collections.singletonMap("num.io.threads", "12"),
+ newImage.configs().configMapForResource(new ConfigResource(BROKER,
"")));
+ }
}