This is an automated email from the ASF dual-hosted git repository.
dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 93b982840 SAMZA-2784: Remove excessive commit logs (#1695)
93b982840 is described below
commit 93b982840a6beba8ba8a48c5c7b4645385349b07
Author: Daniel Chen <[email protected]>
AuthorDate: Fri Jan 19 18:46:09 2024 -0800
SAMZA-2784: Remove excessive commit logs (#1695)
---
.../apache/samza/storage/blobstore/BlobStoreBackupManager.java | 8 ++++----
.../java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java | 4 ++--
.../src/main/scala/org/apache/samza/container/TaskInstance.scala | 4 ++--
3 files changed, 8 insertions(+), 8 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
index 997c5e6ca..bf26eea6f 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
@@ -236,7 +236,7 @@ public class BlobStoreBackupManager implements
TaskBackupManager {
CompletionStage<String> snapshotIndexBlobIdFuture =
snapshotIndexFuture
.thenComposeAsync(si -> {
- LOG.info("Uploading Snapshot index: {} for task: {} store:
{}", si, taskName, storeName);
+ LOG.debug("Uploading Snapshot index: {} for task: {} store:
{}", si, taskName, storeName);
return blobStoreUtil.putSnapshotIndex(si);
}, executor);
@@ -296,7 +296,7 @@ public class BlobStoreBackupManager implements
TaskBackupManager {
// 1. remove TTL of index blob and all of its files and sub-dirs
marked for retention
CompletionStage<Void> removeTTLFuture =
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
- LOG.info("Removing TTL for index blob: {} and all of its files
and sub-dirs for task: {} store :{}",
+ LOG.debug("Removing TTL for index blob: {} and all of its files
and sub-dirs for task: {} store :{}",
snapshotIndexBlobId, taskName, storeName);
return blobStoreUtil.removeTTL(snapshotIndexBlobId,
snapshotIndex, requestMetadata);
}, executor);
@@ -305,7 +305,7 @@ public class BlobStoreBackupManager implements
TaskBackupManager {
// 2. delete the files/subdirs marked for deletion in the snapshot
index.
CompletionStage<Void> cleanupRemoteSnapshotFuture =
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
- LOG.info("Deleting files and dirs to remove for current index
blob: {} for task: {} store: {}",
+ LOG.debug("Deleting files and dirs to remove for current index
blob: {} for task: {} store: {}",
snapshotIndexBlobId, taskName, storeName);
return blobStoreUtil.cleanUpDir(snapshotIndex.getDirIndex(),
requestMetadata);
}, executor);
@@ -317,7 +317,7 @@ public class BlobStoreBackupManager implements
TaskBackupManager {
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
if (snapshotIndex.getPrevSnapshotIndexBlobId().isPresent()) {
String blobId =
snapshotIndex.getPrevSnapshotIndexBlobId().get();
- LOG.info("Removing previous snapshot index blob: {} from blob
store for task: {} store: {}.",
+ LOG.debug("Removing previous snapshot index blob: {} from blob
store for task: {} store: {}.",
blobId, taskName, storeName);
return blobStoreUtil.deleteSnapshotIndexBlob(blobId,
requestMetadata);
} else {
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
index b89f42a31..9b84ac756 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
@@ -219,7 +219,7 @@ public class DirDiffUtil {
if (!compareLargeFileChecksums && isLargeFile) {
// Since RocksDB SST files are immutable after creation, we can skip
the expensive checksum computations
// which requires reading the entire file.
- LOG.debug("Local file: {} and remote file: {} are same. " +
+ LOG.debug("Local file: {} and remote file: {} both present. " +
"Skipping checksum calculation for large file of size: {}.",
localFile.getAbsolutePath(), remoteFile.getFileName(),
localFileAttrs.size());
return true;
@@ -234,7 +234,7 @@ public class DirDiffUtil {
boolean areSameChecksum = localFileChecksum ==
remoteFile.getChecksum();
if (!areSameChecksum) {
- LOG.warn("Local file: {} and remote file: {} are not same. " +
+ LOG.debug("Local file: {} and remote file: {} are not same. " +
"Local checksum: {}. Remote checksum: {}",
localFile.getAbsolutePath(), remoteFile.getFileName(),
localFileChecksum, remoteFile.getChecksum());
} else {
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 89738e2de..70d9ca380 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -186,7 +186,7 @@ class TaskInstance(
// WARNING: cleanUp is NOT optional with blob stores since this is where
we reset the TTL for
// tracked blobs. if this TTL reset is skipped, some of the blobs
retained by future commits may
// be deleted in the background by the blob store, leading to data loss.
- info("Cleaning up stale state from previous run for taskName: %s" format
taskName)
+ debug("Cleaning up stale state from previous run for taskName: %s"
format taskName)
commitManager.cleanUp(checkpointV2.getCheckpointId,
checkpointV2.getStateCheckpointMarkers)
}
@@ -474,7 +474,7 @@ class TaskInstance(
new Function[util.Map[String, util.Map[String, String]],
CompletableFuture[Void]] {
override def apply(uploadSCMs: util.Map[String, util.Map[String,
String]]): CompletableFuture[Void] = {
// Perform cleanup on unused checkpoints
- info("Cleaning up old checkpoint state for taskName: %s checkpointId:
%s" format(taskName, checkpointId))
+ debug("Cleaning up old checkpoint state for taskName: %s checkpointId:
%s" format(taskName, checkpointId))
val cleanUpStartTime = System.nanoTime()
try {
commitManager.cleanUp(checkpointId, uploadSCMs)