cmccabe commented on code in PR #13462: URL: https://github.com/apache/kafka/pull/13462#discussion_r1150905109
########## metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java: ########## @@ -212,38 +216,67 @@ private MetadataLoader( this.uninitializedPublishers = new LinkedHashMap<>(); this.publishers = new LinkedHashMap<>(); this.image = MetadataImage.EMPTY; - this.eventQueue = new KafkaEventQueue(time, logContext, + this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix + "metadata-loader-", new ShutdownEvent()); } - private boolean stillNeedToCatchUp(long offset) { + private boolean stillNeedToCatchUp(String where, long offset) { if (!catchingUp) { - log.trace("We are not in the initial catching up state."); + log.trace("{}: we are not in the initial catching up state.", where); return false; } OptionalLong highWaterMark = highWaterMarkAccessor.get(); if (!highWaterMark.isPresent()) { - log.info("The loader is still catching up because we still don't know the high " + - "water mark yet."); + log.info("{}: the loader is still catching up because we still don't know the high " + + "water mark yet.", where); return true; } if (highWaterMark.getAsLong() > offset) { - log.info("The loader is still catching up because we have loaded up to offset " + - offset + ", but the high water mark is " + highWaterMark.getAsLong()); + log.info("{}: The loader is still catching up because we have loaded up to offset " + + offset + ", but the high water mark is {}", where, highWaterMark.getAsLong()); return true; } - log.info("The loader finished catch up to the current high water mark of " + - highWaterMark.getAsLong()); + log.info("{}: The loader finished catching up to the current high water mark of {}", + where, highWaterMark.getAsLong()); catchingUp = true; return false; } - private void maybeInitializeNewPublishers() { + /** + * Schedule an event to initialize the new publishers that are present in the system. + * + * @param delayNs The minimum time in nanoseconds we should wait. If there is already an + * initialization event scheduled, we will either move its deadline forward + * in time or leave it unchanged. + */ + void scheduleInitializeNewPublishers(long delayNs) { + eventQueue.scheduleDeferred(INITIALIZE_NEW_PUBLISHERS, + new EventQueue.EarliestDeadlineFunction(Time.SYSTEM.nanoseconds() + delayNs), + () -> { + try { + initializeNewPublishers(); + } catch (Throwable e) { + faultHandler.handleFault("Unhandled error initializing new publishers", e); + } + }); + } + + void initializeNewPublishers() { if (uninitializedPublishers.isEmpty()) { - log.trace("There are no uninitialized publishers to initialize."); + log.debug("InitializeNewPublishers: nothing to do."); + return; + } + if (stillNeedToCatchUp("initializeNewPublishers", image.highestOffsetAndEpoch().offset())) { + // Reschedule the initialization for later. + log.debug("InitializeNewPublishers: unable to initialize new publisher(s) {} " + + "because we are still catching up with quorum metadata. Rescheduling.", + uninitializedPublisherNames()); + scheduleInitializeNewPublishers(TimeUnit.MILLISECONDS.toNanos(1)); Review Comment: Fair point. I'll make it 100 ms but also schedule it immediately after handleCommit / handleSnapshot if needed to avoid bloating junit test time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org