dengziming commented on code in PR #13462: URL: https://github.com/apache/kafka/pull/13462#discussion_r1150126555
########## metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java: ########## @@ -212,38 +216,67 @@ private MetadataLoader( this.uninitializedPublishers = new LinkedHashMap<>(); this.publishers = new LinkedHashMap<>(); this.image = MetadataImage.EMPTY; - this.eventQueue = new KafkaEventQueue(time, logContext, + this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, Review Comment: Is this change related to test? ########## metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java: ########## @@ -256,23 +289,26 @@ private void maybeInitializeNewPublishers() { image.provenance(), time.nanoseconds() - startNs); for (Iterator<MetadataPublisher> iter = uninitializedPublishers.values().iterator(); - iter.hasNext(); ) { + iter.hasNext(); ) { MetadataPublisher publisher = iter.next(); iter.remove(); try { - log.info("Publishing initial snapshot at offset {} to {}", - image.highestOffsetAndEpoch().offset(), publisher.name()); + log.info("InitializeNewPublishers: initializing {} with a snapshot at offset {}", + publisher.name(), image.highestOffsetAndEpoch().offset()); publisher.onMetadataUpdate(delta, image, manifest); publisher.onControllerChange(currentLeaderAndEpoch); publishers.put(publisher.name(), publisher); } catch (Throwable e) { - faultHandler.handleFault("Unhandled error publishing the initial metadata " + - "image from snapshot at offset " + image.highestOffsetAndEpoch().offset() + - " with publisher " + publisher.name(), e); + faultHandler.handleFault("Unhandled error initializing " + publisher.name() + + " with a snapshot at offset " + image.highestOffsetAndEpoch().offset(), e); } } } + String uninitializedPublisherNames() { Review Comment: nit: we could make this method private. ########## metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java: ########## @@ -212,38 +216,67 @@ private MetadataLoader( this.uninitializedPublishers = new LinkedHashMap<>(); this.publishers = new LinkedHashMap<>(); this.image = MetadataImage.EMPTY; - this.eventQueue = new KafkaEventQueue(time, logContext, + this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix + "metadata-loader-", new ShutdownEvent()); } - private boolean stillNeedToCatchUp(long offset) { + private boolean stillNeedToCatchUp(String where, long offset) { if (!catchingUp) { - log.trace("We are not in the initial catching up state."); + log.trace("{}: we are not in the initial catching up state.", where); return false; } OptionalLong highWaterMark = highWaterMarkAccessor.get(); if (!highWaterMark.isPresent()) { - log.info("The loader is still catching up because we still don't know the high " + - "water mark yet."); + log.info("{}: the loader is still catching up because we still don't know the high " + + "water mark yet.", where); return true; } if (highWaterMark.getAsLong() > offset) { - log.info("The loader is still catching up because we have loaded up to offset " + - offset + ", but the high water mark is " + highWaterMark.getAsLong()); + log.info("{}: The loader is still catching up because we have loaded up to offset " + + offset + ", but the high water mark is {}", where, highWaterMark.getAsLong()); return true; } - log.info("The loader finished catch up to the current high water mark of " + - highWaterMark.getAsLong()); + log.info("{}: The loader finished catching up to the current high water mark of {}", + where, highWaterMark.getAsLong()); catchingUp = true; Review Comment: should we change this to catchingUp= false? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org