This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 280767445c1 [FLINK-38288] Enhance BlobWriter logging with offloading time in millis. 280767445c1 is described below commit 280767445c17db090bfc65e0e8d09374b9ea92bf Author: Aleksandr Iushmanov <aiushma...@confluent.io> AuthorDate: Wed Aug 27 17:31:52 2025 +0100 [FLINK-38288] Enhance BlobWriter logging with offloading time in millis. --- .../org/apache/flink/runtime/blob/BlobWriter.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java index 2d5292b42cb..d7b7c542363 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.types.Either; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.clock.SystemClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,12 +112,26 @@ public interface BlobWriter { Preconditions.checkNotNull(serializedValue); Preconditions.checkNotNull(jobId); Preconditions.checkNotNull(blobWriter); + final SystemClock systemClock = SystemClock.getInstance(); + final long offloadStartTs = systemClock.relativeTimeMillis(); try { final PermanentBlobKey permanentBlobKey = blobWriter.putPermanent(jobId, serializedValue.getByteArray()); - return Either.Right(permanentBlobKey); + final Either<SerializedValue<T>, PermanentBlobKey> right = + Either.Right(permanentBlobKey); + LOG.info( + "BLOB for job {} with size: {} has been offloaded in {} millis", + jobId, + serializedValue.getByteArray().length, + systemClock.relativeTimeMillis() - offloadStartTs); + return right; } catch (IOException e) { - LOG.warn("Failed to offload value for job {} to BLOB store.", jobId, e); + LOG.warn( + "Failed to offload value for job {} to BLOB store with size: {} after {} millis", + jobId, + serializedValue.getByteArray().length, + systemClock.relativeTimeMillis() - offloadStartTs, + e); return Either.Left(serializedValue); } }