This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 280767445c1 [FLINK-38288] Enhance BlobWriter logging with offloading 
time in millis.
280767445c1 is described below

commit 280767445c17db090bfc65e0e8d09374b9ea92bf
Author: Aleksandr Iushmanov <aiushma...@confluent.io>
AuthorDate: Wed Aug 27 17:31:52 2025 +0100

    [FLINK-38288] Enhance BlobWriter logging with offloading time in millis.
---
 .../org/apache/flink/runtime/blob/BlobWriter.java     | 19 +++++++++++++++++--
 1 file changed, 17 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
index 2d5292b42cb..d7b7c542363 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.clock.SystemClock;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,12 +112,26 @@ public interface BlobWriter {
         Preconditions.checkNotNull(serializedValue);
         Preconditions.checkNotNull(jobId);
         Preconditions.checkNotNull(blobWriter);
+        final SystemClock systemClock = SystemClock.getInstance();
+        final long offloadStartTs = systemClock.relativeTimeMillis();
         try {
             final PermanentBlobKey permanentBlobKey =
                     blobWriter.putPermanent(jobId, 
serializedValue.getByteArray());
-            return Either.Right(permanentBlobKey);
+            final Either<SerializedValue<T>, PermanentBlobKey> right =
+                    Either.Right(permanentBlobKey);
+            LOG.info(
+                    "BLOB for job {} with size: {} has been offloaded in {} 
millis",
+                    jobId,
+                    serializedValue.getByteArray().length,
+                    systemClock.relativeTimeMillis() - offloadStartTs);
+            return right;
         } catch (IOException e) {
-            LOG.warn("Failed to offload value for job {} to BLOB store.", 
jobId, e);
+            LOG.warn(
+                    "Failed to offload value for job {} to BLOB store with 
size: {} after {} millis",
+                    jobId,
+                    serializedValue.getByteArray().length,
+                    systemClock.relativeTimeMillis() - offloadStartTs,
+                    e);
             return Either.Left(serializedValue);
         }
     }

Reply via email to