This is an automated email from the ASF dual-hosted git repository. dchen pushed a commit to branch dxichen/SAMZA-2784 in repository https://gitbox.apache.org/repos/asf/samza.git
commit 37b2abd5f92b362d9cd93614f4976c6234977cde Author: Daniel Chen <[email protected]> AuthorDate: Fri Jan 19 15:09:13 2024 -0800 SAMZA-2784: Remove excessive commit logs --- .../apache/samza/storage/blobstore/BlobStoreBackupManager.java | 8 ++++---- .../java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java | 4 ++-- .../src/main/scala/org/apache/samza/container/TaskInstance.scala | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java index 997c5e6ca..bf26eea6f 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java @@ -236,7 +236,7 @@ public class BlobStoreBackupManager implements TaskBackupManager { CompletionStage<String> snapshotIndexBlobIdFuture = snapshotIndexFuture .thenComposeAsync(si -> { - LOG.info("Uploading Snapshot index: {} for task: {} store: {}", si, taskName, storeName); + LOG.debug("Uploading Snapshot index: {} for task: {} store: {}", si, taskName, storeName); return blobStoreUtil.putSnapshotIndex(si); }, executor); @@ -296,7 +296,7 @@ public class BlobStoreBackupManager implements TaskBackupManager { // 1. remove TTL of index blob and all of its files and sub-dirs marked for retention CompletionStage<Void> removeTTLFuture = snapshotIndexFuture.thenComposeAsync(snapshotIndex -> { - LOG.info("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}", + LOG.debug("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}", snapshotIndexBlobId, taskName, storeName); return blobStoreUtil.removeTTL(snapshotIndexBlobId, snapshotIndex, requestMetadata); }, executor); @@ -305,7 +305,7 @@ public class BlobStoreBackupManager implements TaskBackupManager { // 2. delete the files/subdirs marked for deletion in the snapshot index. CompletionStage<Void> cleanupRemoteSnapshotFuture = snapshotIndexFuture.thenComposeAsync(snapshotIndex -> { - LOG.info("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}", + LOG.debug("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}", snapshotIndexBlobId, taskName, storeName); return blobStoreUtil.cleanUpDir(snapshotIndex.getDirIndex(), requestMetadata); }, executor); @@ -317,7 +317,7 @@ public class BlobStoreBackupManager implements TaskBackupManager { snapshotIndexFuture.thenComposeAsync(snapshotIndex -> { if (snapshotIndex.getPrevSnapshotIndexBlobId().isPresent()) { String blobId = snapshotIndex.getPrevSnapshotIndexBlobId().get(); - LOG.info("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.", + LOG.debug("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.", blobId, taskName, storeName); return blobStoreUtil.deleteSnapshotIndexBlob(blobId, requestMetadata); } else { diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java index b89f42a31..9b84ac756 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java @@ -219,7 +219,7 @@ public class DirDiffUtil { if (!compareLargeFileChecksums && isLargeFile) { // Since RocksDB SST files are immutable after creation, we can skip the expensive checksum computations // which requires reading the entire file. - LOG.debug("Local file: {} and remote file: {} are same. " + + LOG.debug("Local file: {} and remote file: {} both present. " + "Skipping checksum calculation for large file of size: {}.", localFile.getAbsolutePath(), remoteFile.getFileName(), localFileAttrs.size()); return true; @@ -234,7 +234,7 @@ public class DirDiffUtil { boolean areSameChecksum = localFileChecksum == remoteFile.getChecksum(); if (!areSameChecksum) { - LOG.warn("Local file: {} and remote file: {} are not same. " + + LOG.debug("Local file: {} and remote file: {} are not same. " + "Local checksum: {}. Remote checksum: {}", localFile.getAbsolutePath(), remoteFile.getFileName(), localFileChecksum, remoteFile.getChecksum()); } else { diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 89738e2de..70d9ca380 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -186,7 +186,7 @@ class TaskInstance( // WARNING: cleanUp is NOT optional with blob stores since this is where we reset the TTL for // tracked blobs. if this TTL reset is skipped, some of the blobs retained by future commits may // be deleted in the background by the blob store, leading to data loss. - info("Cleaning up stale state from previous run for taskName: %s" format taskName) + debug("Cleaning up stale state from previous run for taskName: %s" format taskName) commitManager.cleanUp(checkpointV2.getCheckpointId, checkpointV2.getStateCheckpointMarkers) } @@ -474,7 +474,7 @@ class TaskInstance( new Function[util.Map[String, util.Map[String, String]], CompletableFuture[Void]] { override def apply(uploadSCMs: util.Map[String, util.Map[String, String]]): CompletableFuture[Void] = { // Perform cleanup on unused checkpoints - info("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId)) + debug("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId)) val cleanUpStartTime = System.nanoTime() try { commitManager.cleanUp(checkpointId, uploadSCMs)
