This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 843ede494f4 KAFKA-15389: Don't publish until we have replayed at least 
one record (#14282)
843ede494f4 is described below

commit 843ede494f4ea0d6869f8410622d71cc648c5358
Author: David Arthur <[email protected]>
AuthorDate: Fri Aug 25 13:41:43 2023 -0400

    KAFKA-15389: Don't publish until we have replayed at least one record 
(#14282)
    
    When starting up a controller for the first time (i.e., with an empty log), 
it is possible for
    MetadataLoader to publish an empty MetadataImage before the activation 
records of the controller
    have been written. While this is not a bug, it could be confusing. This 
patch closes that gap by
    waiting for at least one controller record to be committed before the 
MetadataLoader starts publishing
    images.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../kafka/image/loader/MetadataBatchLoader.java    | 15 +++++-
 .../apache/kafka/image/loader/MetadataLoader.java  |  8 +++-
 .../kafka/image/loader/MetadataLoaderTest.java     | 53 +++++++++++++++++++++-
 3 files changed, 72 insertions(+), 4 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
index 33f4846ac70..b1d22364cc0 100644
--- 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
+++ 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
@@ -67,6 +67,7 @@ public class MetadataBatchLoader {
     private int numBatches;
     private long totalBatchElapsedNs;
     private TransactionState transactionState;
+    private boolean hasSeenRecord;
 
     public MetadataBatchLoader(
         LogContext logContext,
@@ -78,16 +79,27 @@ public class MetadataBatchLoader {
         this.time = time;
         this.faultHandler = faultHandler;
         this.callback = callback;
+        this.resetToImage(MetadataImage.EMPTY);
+        this.hasSeenRecord = false;
+    }
+
+    /**
+     * @return True if this batch loader has seen at least one record.
+     */
+    public boolean hasSeenRecord() {
+        return hasSeenRecord;
     }
 
     /**
      * Reset the state of this batch loader to the given image. Any un-flushed 
state will be
-     * discarded.
+     * discarded. This is called after applying a delta and passing it back to 
MetadataLoader, or
+     * when MetadataLoader loads a snapshot.
      *
      * @param image     Metadata image to reset this batch loader's state to.
      */
     public void resetToImage(MetadataImage image) {
         this.image = image;
+        this.hasSeenRecord = true;
         this.delta = new MetadataDelta.Builder().setImage(image).build();
         this.transactionState = TransactionState.NO_TRANSACTION;
         this.lastOffset = image.provenance().lastContainedOffset();
@@ -241,6 +253,7 @@ public class MetadataBatchLoader {
                     default:
                         break;
                 }
+                hasSeenRecord = true;
                 delta.replay(record.message());
         }
     }
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index c5ba2af0b50..307fda8d218 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -212,7 +212,6 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
             time,
             faultHandler,
             this::maybePublishMetadata);
-        this.batchLoader.resetToImage(this.image);
         this.eventQueue = new KafkaEventQueue(
             Time.SYSTEM,
             logContext,
@@ -241,6 +240,11 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                     offset + ", but the high water mark is {}", where, 
highWaterMark.getAsLong());
             return true;
         }
+        if (!batchLoader.hasSeenRecord()) {
+            log.info("{}: The loader is still catching up because we have not 
loaded a controller record as of offset " +
+                    offset + " and high water mark is {}", where, 
highWaterMark.getAsLong());
+            return true;
+        }
         log.info("{}: The loader finished catching up to the current high 
water mark of {}",
                 where, highWaterMark.getAsLong());
         catchingUp = false;
@@ -387,8 +391,8 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                         image.provenance().lastContainedOffset(),
                         NANOSECONDS.toMicros(manifest.elapsedNs()));
                 MetadataImage image = delta.apply(manifest.provenance());
-                maybePublishMetadata(delta, image, manifest);
                 batchLoader.resetToImage(image);
+                maybePublishMetadata(delta, image, manifest);
             } catch (Throwable e) {
                 // This is a general catch-all block where we don't expect to 
end up;
                 // failure-prone operations should have individual try/catch 
blocks around them.
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java 
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index 199d907a447..62d974b8b5c 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -18,9 +18,11 @@
 package org.apache.kafka.image.loader;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.message.SnapshotHeaderRecord;
 import org.apache.kafka.common.metadata.AbortTransactionRecord;
 import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.EndTransactionRecord;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
@@ -48,6 +50,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -338,8 +341,8 @@ public class MetadataLoaderTest {
                 setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
                 build()) {
             loader.installPublishers(publishers).get();
-            publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
             loadEmptySnapshot(loader, 200);
+            publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
             assertEquals(200L, loader.lastAppliedOffset());
             loadEmptySnapshot(loader, 300);
             assertEquals(300L, loader.lastAppliedOffset());
@@ -668,6 +671,7 @@ public class MetadataLoaderTest {
                         
.setTopicId(Uuid.fromString("dMCqhcK4T5miGH5wEX7NsQ")), (short) 0)
             )));
             loader.waitForAllEventsToBeHandled();
+            publisher.firstPublish.get(30, TimeUnit.SECONDS);
             assertNull(publisher.latestImage.topics().getTopic("foo"),
                 "Topic should not be visible since we started transaction");
 
@@ -732,6 +736,7 @@ public class MetadataLoaderTest {
 
             // After MetadataLoader is fixed to handle arbitrary transactions, 
we would expect "foo"
             // to be visible at this point.
+            publisher.firstPublish.get(30, TimeUnit.SECONDS);
             assertNotNull(publisher.latestImage.topics().getTopic("foo"));
         }
         faultHandler.maybeRethrowFirstException();
@@ -758,6 +763,7 @@ public class MetadataLoaderTest {
                         
.setTopicId(Uuid.fromString("HQSM3ccPQISrHqYK_C8GpA")), (short) 0)
                 )));
             loader.waitForAllEventsToBeHandled();
+            publisher.firstPublish.get(30, TimeUnit.SECONDS);
             assertNull(publisher.latestImage.topics().getTopic("foo"));
 
             // loading a snapshot discards any in-flight transaction
@@ -773,4 +779,49 @@ public class MetadataLoaderTest {
         }
         faultHandler.maybeRethrowFirstException();
     }
+
+    @Test
+    public void testNoPublishEmptyImage() throws Exception {
+        MockFaultHandler faultHandler = new 
MockFaultHandler("testNoPublishEmptyImage");
+        List<MetadataImage> capturedImages = new ArrayList<>();
+        CompletableFuture<Void> firstPublish = new CompletableFuture<>();
+        MetadataPublisher capturingPublisher = new MetadataPublisher() {
+            @Override
+            public String name() {
+                return "testNoPublishEmptyImage";
+            }
+
+            @Override
+            public void onMetadataUpdate(MetadataDelta delta, MetadataImage 
newImage, LoaderManifest manifest) {
+                if (!firstPublish.isDone()) {
+                    firstPublish.complete(null);
+                }
+                capturedImages.add(newImage);
+            }
+        };
+
+        try (MetadataLoader loader = new MetadataLoader.Builder().
+                setFaultHandler(faultHandler).
+                setHighWaterMarkAccessor(() -> OptionalLong.of(1)).
+                build()) {
+            
loader.installPublishers(Collections.singletonList(capturingPublisher)).get();
+            loader.handleCommit(
+                MockBatchReader.newSingleBatchReader(0, 1, 
Collections.singletonList(
+                    // Any record will work here
+                    new ApiMessageAndVersion(new ConfigRecord()
+                        .setResourceType(ConfigResource.Type.BROKER.id())
+                        .setResourceName("3000")
+                        .setName("foo")
+                        .setValue("bar"), (short) 0)
+                )));
+            firstPublish.get(30, TimeUnit.SECONDS);
+
+            assertFalse(capturedImages.isEmpty());
+            capturedImages.forEach(metadataImage -> {
+                assertFalse(metadataImage.isEmpty());
+            });
+
+        }
+        faultHandler.maybeRethrowFirstException();
+    }
 }

Reply via email to