mumrah commented on code in PR #13462:
URL: https://github.com/apache/kafka/pull/13462#discussion_r1150744092


##########
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##########
@@ -212,38 +216,67 @@ private MetadataLoader(
         this.uninitializedPublishers = new LinkedHashMap<>();
         this.publishers = new LinkedHashMap<>();
         this.image = MetadataImage.EMPTY;
-        this.eventQueue = new KafkaEventQueue(time, logContext,
+        this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext,
                 threadNamePrefix + "metadata-loader-",
                 new ShutdownEvent());
     }
 
-    private boolean stillNeedToCatchUp(long offset) {
+    private boolean stillNeedToCatchUp(String where, long offset) {
         if (!catchingUp) {
-            log.trace("We are not in the initial catching up state.");
+            log.trace("{}: we are not in the initial catching up state.", 
where);
             return false;
         }
         OptionalLong highWaterMark = highWaterMarkAccessor.get();
         if (!highWaterMark.isPresent()) {
-            log.info("The loader is still catching up because we still don't 
know the high " +
-                    "water mark yet.");
+            log.info("{}: the loader is still catching up because we still 
don't know the high " +
+                    "water mark yet.", where);
             return true;
         }
         if (highWaterMark.getAsLong() > offset) {
-            log.info("The loader is still catching up because we have loaded 
up to offset " +
-                    offset + ", but the high water mark is " + 
highWaterMark.getAsLong());
+            log.info("{}: The loader is still catching up because we have 
loaded up to offset " +
+                    offset + ", but the high water mark is {}", where, 
highWaterMark.getAsLong());
             return true;
         }
-        log.info("The loader finished catch up to the current high water mark 
of " +
-                highWaterMark.getAsLong());
+        log.info("{}: The loader finished catching up to the current high 
water mark of {}",
+                where, highWaterMark.getAsLong());
         catchingUp = true;
         return false;
     }
 
-    private void maybeInitializeNewPublishers() {
+    /**
+     * Schedule an event to initialize the new publishers that are present in 
the system.
+     *
+     * @param delayNs   The minimum time in nanoseconds we should wait. If 
there is already an
+     *                  initialization event scheduled, we will either move 
its deadline forward
+     *                  in time or leave it unchanged.
+     */
+    void scheduleInitializeNewPublishers(long delayNs) {
+        eventQueue.scheduleDeferred(INITIALIZE_NEW_PUBLISHERS,
+            new EventQueue.EarliestDeadlineFunction(Time.SYSTEM.nanoseconds() 
+ delayNs),
+            () -> {
+                try {
+                    initializeNewPublishers();
+                } catch (Throwable e) {
+                    faultHandler.handleFault("Unhandled error initializing new 
publishers", e);
+                }
+            });
+    }
+
+    void initializeNewPublishers() {
         if (uninitializedPublishers.isEmpty()) {
-            log.trace("There are no uninitialized publishers to initialize.");
+            log.debug("InitializeNewPublishers: nothing to do.");
+            return;
+        }
+        if (stillNeedToCatchUp("initializeNewPublishers", 
image.highestOffsetAndEpoch().offset())) {
+            // Reschedule the initialization for later.
+            log.debug("InitializeNewPublishers: unable to initialize new 
publisher(s) {} " +
+                            "because we are still catching up with quorum 
metadata. Rescheduling.",
+                    uninitializedPublisherNames());
+            scheduleInitializeNewPublishers(TimeUnit.MILLISECONDS.toNanos(1));

Review Comment:
   The high-watermark accessor is reading the HW from RaftClient. Is there a 
case where we would be waiting on fetches from RaftClient before we had 
sufficient logs to be caught up? I'm wondering if 1ms is too tight for cases 
where we're waiting on some network requests.



##########
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##########
@@ -256,23 +289,26 @@ private void maybeInitializeNewPublishers() {
                 image.provenance(),
                 time.nanoseconds() - startNs);
         for (Iterator<MetadataPublisher> iter = 
uninitializedPublishers.values().iterator();
-                iter.hasNext(); ) {
+                 iter.hasNext(); ) {

Review Comment:
   nit: is this the right indentation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to