This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e69a3110687 KAFKA-19076 replace `String` by `Supplier<String>` for 
UnifiedLog#maybeHandleIOException (#19392)
e69a3110687 is described below

commit e69a31106879dc5eaf8e80f1fe0c05422488df49
Author: Nick Guo <[email protected]>
AuthorDate: Mon Apr 7 00:43:44 2025 +0800

    KAFKA-19076 replace `String` by `Supplier<String>` for 
UnifiedLog#maybeHandleIOException (#19392)
    
    jira: https://issues.apache.org/jira/browse/KAFKA-19076
    
    the message is used when the function encounters error, so the error
    message should be created lazy.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/storage/internals/log/UnifiedLog.java    | 27 +++++++++++-----------
 1 file changed, 14 insertions(+), 13 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index b9ea2dfd856..dcef6929d19 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -83,6 +83,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -922,7 +923,7 @@ public class UnifiedLog implements AutoCloseable {
             localLog.checkIfMemoryMappedBufferClosed();
             producerExpireCheck.cancel(true);
             maybeHandleIOException(
-                    "Error while renaming dir for " + topicPartition() + " in 
dir " + dir().getParent(),
+                    () -> "Error while renaming dir for " + topicPartition() + 
" in dir " + dir().getParent(),
                     () -> {
                         // We take a snapshot at the last written offset to 
hopefully avoid the need to scan the log
                         // after restarting and to ensure that we cannot 
inadvertently hit the upgrade optimization
@@ -945,7 +946,7 @@ public class UnifiedLog implements AutoCloseable {
     public void renameDir(String name, boolean shouldReinitialize) {
         synchronized (lock) {
             maybeHandleIOException(
-                    "Error while renaming dir for " + topicPartition() + " in 
log dir " + dir().getParent(),
+                    () -> "Error while renaming dir for " + topicPartition() + 
" in log dir " + dir().getParent(),
                     () -> {
                         // Flush partitionMetadata file before initializing 
again
                         maybeFlushMetadataFile();
@@ -1087,7 +1088,7 @@ public class UnifiedLog implements AutoCloseable {
             // they are valid, insert them in the log
             synchronized (lock)  {
                 return maybeHandleIOException(
-                        "Error while appending records to " + topicPartition() 
+ " in dir " + dir().getParent(),
+                        () -> "Error while appending records to " + 
topicPartition() + " in dir " + dir().getParent(),
                         () -> {
                             MemoryRecords validRecords = trimmedRecords;
                             localLog.checkIfMemoryMappedBufferClosed();
@@ -1300,7 +1301,7 @@ public class UnifiedLog implements AutoCloseable {
         // The deleteRecordsOffset may be lost only if all in-sync replicas of 
this broker are shutdown
         // in an unclean manner within 
log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is 
low.
         return maybeHandleIOException(
-                "Exception while increasing log start offset for " + 
topicPartition() + " to " + newLogStartOffset + " in dir " + dir().getParent(),
+                () -> "Exception while increasing log start offset for " + 
topicPartition() + " to " + newLogStartOffset + " in dir " + dir().getParent(),
                 () -> {
                     synchronized (lock)  {
                         if (newLogStartOffset > highWatermark()) {
@@ -1613,7 +1614,7 @@ public class UnifiedLog implements AutoCloseable {
      */
     public OffsetResultHolder fetchOffsetByTimestamp(long targetTimestamp, 
Optional<AsyncOffsetReader> remoteOffsetReader) {
         return maybeHandleIOException(
-                "Error while fetching offset by timestamp for " + 
topicPartition() + " in dir " + dir().getParent(),
+                () -> "Error while fetching offset by timestamp for " + 
topicPartition() + " in dir " + dir().getParent(),
                 () -> {
                     logger.debug("Searching offset for timestamp {}.", 
targetTimestamp);
 
@@ -1831,7 +1832,7 @@ public class UnifiedLog implements AutoCloseable {
     }
 
     private int deleteSegments(List<LogSegment> deletable, 
SegmentDeletionReason reason) {
-        return maybeHandleIOException("Error while deleting segments for " + 
topicPartition() + " in dir " + dir().getParent(),
+        return maybeHandleIOException(() -> "Error while deleting segments for 
" + topicPartition() + " in dir " + dir().getParent(),
                 () -> {
                     int numToDelete = deletable.size();
                     if (numToDelete > 0) {
@@ -2138,7 +2139,7 @@ public class UnifiedLog implements AutoCloseable {
         long flushOffset = includingOffset ? offset + 1 : offset;
         String includingOffsetStr = includingOffset ? "inclusive" : 
"exclusive";
         maybeHandleIOException(
-                "Error while flushing log for " + topicPartition() + " in dir 
" + dir().getParent() + " with offset " + offset +
+                () -> "Error while flushing log for " + topicPartition() + " 
in dir " + dir().getParent() + " with offset " + offset +
                 " (" + includingOffsetStr + ") and recovery point " + offset,
                 () -> {
                     if (flushOffset > localLog.recoveryPoint()) {
@@ -2158,7 +2159,7 @@ public class UnifiedLog implements AutoCloseable {
      */
     public void delete() {
         maybeHandleIOException(
-            "Error while deleting log for " + topicPartition() + " in dir " + 
dir().getParent(),
+            () -> "Error while deleting log for " + topicPartition() + " in 
dir " + dir().getParent(),
             () -> {
                 synchronized (lock) {
                     localLog.checkIfMemoryMappedBufferClosed();
@@ -2204,7 +2205,7 @@ public class UnifiedLog implements AutoCloseable {
     // visible for testing
     public void flushProducerStateSnapshot(Path snapshot) {
         maybeHandleIOException(
-                "Error while deleting producer state snapshot " + snapshot + " 
for " + topicPartition() + " in dir " + dir().getParent(),
+                () -> "Error while deleting producer state snapshot " + 
snapshot + " for " + topicPartition() + " in dir " + dir().getParent(),
                 () -> {
                     Utils.flushFileIfExists(snapshot);
                     return null;
@@ -2219,7 +2220,7 @@ public class UnifiedLog implements AutoCloseable {
      */
     public boolean truncateTo(long targetOffset) {
         return maybeHandleIOException(
-                "Error while truncating log to offset " + targetOffset + " for 
" + topicPartition() + " in dir " + dir().getParent(),
+                () -> "Error while truncating log to offset " + targetOffset + 
" for " + topicPartition() + " in dir " + dir().getParent(),
                 () -> {
                     if (targetOffset < 0) {
                         throw new IllegalArgumentException("Cannot truncate 
partition " + topicPartition() + " to a negative offset (" + targetOffset + 
").");
@@ -2263,7 +2264,7 @@ public class UnifiedLog implements AutoCloseable {
      */
     public void truncateFullyAndStartAt(long newOffset, Optional<Long> 
logStartOffsetOpt) {
         maybeHandleIOException(
-                "Error while truncating the entire log for " + 
topicPartition() + " in dir " + dir().getParent(),
+                () -> "Error while truncating the entire log for " + 
topicPartition() + " in dir " + dir().getParent(),
                 () -> {
                     logger.debug("Truncate and start at offset {}, 
logStartOffset: {}", newOffset, logStartOffsetOpt.orElse(newOffset));
                     synchronized (lock)  {
@@ -2370,8 +2371,8 @@ public class UnifiedLog implements AutoCloseable {
         metricNames.clear();
     }
 
-    private <T> T maybeHandleIOException(String msg, StorageAction<T, 
IOException> fun) throws KafkaStorageException {
-        return LocalLog.maybeHandleIOException(logDirFailureChannel(), 
parentDir(), () -> msg, fun);
+    private <T> T maybeHandleIOException(Supplier<String> msg, 
StorageAction<T, IOException> fun) throws KafkaStorageException {
+        return LocalLog.maybeHandleIOException(logDirFailureChannel(), 
parentDir(), msg, fun);
     }
 
     public List<LogSegment> splitOverflowedSegment(LogSegment segment) throws 
IOException {

Reply via email to