This is an automated email from the ASF dual-hosted git repository.

dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 93b982840 SAMZA-2784: Remove excessive commit logs (#1695)
93b982840 is described below

commit 93b982840a6beba8ba8a48c5c7b4645385349b07
Author: Daniel Chen <[email protected]>
AuthorDate: Fri Jan 19 18:46:09 2024 -0800

    SAMZA-2784: Remove excessive commit logs (#1695)
---
 .../apache/samza/storage/blobstore/BlobStoreBackupManager.java    | 8 ++++----
 .../java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java | 4 ++--
 .../src/main/scala/org/apache/samza/container/TaskInstance.scala  | 4 ++--
 3 files changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
index 997c5e6ca..bf26eea6f 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
@@ -236,7 +236,7 @@ public class BlobStoreBackupManager implements 
TaskBackupManager {
         CompletionStage<String> snapshotIndexBlobIdFuture =
             snapshotIndexFuture
                 .thenComposeAsync(si -> {
-                  LOG.info("Uploading Snapshot index: {} for task: {} store: 
{}", si, taskName, storeName);
+                  LOG.debug("Uploading Snapshot index: {} for task: {} store: 
{}", si, taskName, storeName);
                   return blobStoreUtil.putSnapshotIndex(si);
                 }, executor);
 
@@ -296,7 +296,7 @@ public class BlobStoreBackupManager implements 
TaskBackupManager {
         // 1. remove TTL of index blob and all of its files and sub-dirs 
marked for retention
         CompletionStage<Void> removeTTLFuture =
             snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
-              LOG.info("Removing TTL for index blob: {} and all of its files 
and sub-dirs for task: {} store :{}",
+              LOG.debug("Removing TTL for index blob: {} and all of its files 
and sub-dirs for task: {} store :{}",
                   snapshotIndexBlobId, taskName, storeName);
               return blobStoreUtil.removeTTL(snapshotIndexBlobId, 
snapshotIndex, requestMetadata);
             }, executor);
@@ -305,7 +305,7 @@ public class BlobStoreBackupManager implements 
TaskBackupManager {
         // 2. delete the files/subdirs marked for deletion in the snapshot 
index.
         CompletionStage<Void> cleanupRemoteSnapshotFuture =
             snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
-              LOG.info("Deleting files and dirs to remove for current index 
blob: {} for task: {} store: {}",
+              LOG.debug("Deleting files and dirs to remove for current index 
blob: {} for task: {} store: {}",
                   snapshotIndexBlobId, taskName, storeName);
               return blobStoreUtil.cleanUpDir(snapshotIndex.getDirIndex(), 
requestMetadata);
             }, executor);
@@ -317,7 +317,7 @@ public class BlobStoreBackupManager implements 
TaskBackupManager {
             snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
               if (snapshotIndex.getPrevSnapshotIndexBlobId().isPresent()) {
                 String blobId = 
snapshotIndex.getPrevSnapshotIndexBlobId().get();
-                LOG.info("Removing previous snapshot index blob: {} from blob 
store for task: {} store: {}.",
+                LOG.debug("Removing previous snapshot index blob: {} from blob 
store for task: {} store: {}.",
                     blobId, taskName, storeName);
                 return blobStoreUtil.deleteSnapshotIndexBlob(blobId, 
requestMetadata);
               } else {
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
index b89f42a31..9b84ac756 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
@@ -219,7 +219,7 @@ public class DirDiffUtil {
         if (!compareLargeFileChecksums && isLargeFile) {
           // Since RocksDB SST files are immutable after creation, we can skip 
the expensive checksum computations
           // which requires reading the entire file.
-          LOG.debug("Local file: {} and remote file: {} are same. " +
+          LOG.debug("Local file: {} and remote file: {} both present. " +
                   "Skipping checksum calculation for large file of size: {}.",
               localFile.getAbsolutePath(), remoteFile.getFileName(), 
localFileAttrs.size());
           return true;
@@ -234,7 +234,7 @@ public class DirDiffUtil {
 
             boolean areSameChecksum = localFileChecksum == 
remoteFile.getChecksum();
             if (!areSameChecksum) {
-              LOG.warn("Local file: {} and remote file: {} are not same. " +
+              LOG.debug("Local file: {} and remote file: {} are not same. " +
                       "Local checksum: {}. Remote checksum: {}",
                   localFile.getAbsolutePath(), remoteFile.getFileName(), 
localFileChecksum, remoteFile.getChecksum());
             } else {
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 89738e2de..70d9ca380 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -186,7 +186,7 @@ class TaskInstance(
       // WARNING: cleanUp is NOT optional with blob stores since this is where 
we reset the TTL for
       // tracked blobs. if this TTL reset is skipped, some of the blobs 
retained by future commits may
       // be deleted in the background by the blob store, leading to data loss.
-      info("Cleaning up stale state from previous run for taskName: %s" format 
taskName)
+      debug("Cleaning up stale state from previous run for taskName: %s" 
format taskName)
       commitManager.cleanUp(checkpointV2.getCheckpointId, 
checkpointV2.getStateCheckpointMarkers)
     }
 
@@ -474,7 +474,7 @@ class TaskInstance(
     new Function[util.Map[String, util.Map[String, String]], 
CompletableFuture[Void]] {
       override def apply(uploadSCMs: util.Map[String, util.Map[String, 
String]]): CompletableFuture[Void] = {
         // Perform cleanup on unused checkpoints
-        info("Cleaning up old checkpoint state for taskName: %s checkpointId: 
%s" format(taskName, checkpointId))
+        debug("Cleaning up old checkpoint state for taskName: %s checkpointId: 
%s" format(taskName, checkpointId))
         val cleanUpStartTime = System.nanoTime()
         try {
           commitManager.cleanUp(checkpointId, uploadSCMs)

Reply via email to