Repository: samza
Updated Branches:
  refs/heads/master d1653aad0 -> e827d150f


SAMZA-1335: Improve logging for LocalStoreMonitor

Author: Jacob Maes <[email protected]>

Reviewers: Prateek Maheshwari <[email protected]>

Closes #226 from jmakes/samza-1335


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e827d150
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e827d150
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e827d150

Branch: refs/heads/master
Commit: e827d150fc12c81bb64788d28e07488c42a93687
Parents: d1653aa
Author: Jacob Maes <[email protected]>
Authored: Thu Jun 15 13:40:43 2017 -0700
Committer: Jacob Maes <[email protected]>
Committed: Thu Jun 15 13:40:43 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/monitor/LocalStoreMonitor.java | 20 +++++++++++++-------
 .../java/org/apache/samza/rest/model/Task.java  |  5 +++++
 2 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e827d150/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java 
b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
index 8195491..8b25636 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
@@ -83,9 +83,10 @@ public class LocalStoreMonitor implements Monitor {
                              String.format("%s-%s", jobInstance.getJobName(), 
jobInstance.getJobId()));
       try {
         JobStatus jobStatus = jobsClient.getJobStatus(jobInstance);
+        LOG.info("Job: {} has the status: {}.", jobInstance, jobStatus);
         for (Task task : jobsClient.getTasks(jobInstance)) {
+          LOG.info("Evaluating stores for task: {}", task);
           for (String storeName : jobDir.list(DirectoryFileFilter.DIRECTORY)) {
-            LOG.info("Job: {} has the running status: {} with preferred host: 
{}.", jobInstance, jobStatus, task.getPreferredHost());
             /**
              *  A task store is active if all of the following conditions are 
true:
              *  a) If the store is amongst the active stores of the task.
@@ -95,9 +96,9 @@ public class LocalStoreMonitor implements Monitor {
             if (jobStatus.hasBeenStarted()
                 && task.getStoreNames().contains(storeName)
                 && task.getPreferredHost().equals(localHostName)) {
-              LOG.info(String.format("Store %s is actively used by the task: 
%s.", storeName, task.getTaskName()));
+              LOG.info(String.format("Local store: %s is actively used by the 
task: %s.", storeName, task.getTaskName()));
             } else {
-              LOG.info(String.format("Store %s not used by the task: %s.", 
storeName, task.getTaskName()));
+              LOG.info(String.format("Local store: %s not used by the task: 
%s.", storeName, task.getTaskName()));
               
markSweepTaskStore(TaskStorageManager.getStorePartitionDir(jobDir, storeName, 
new TaskName(task.getTaskName())));
             }
           }
@@ -135,7 +136,7 @@ public class LocalStoreMonitor implements Monitor {
    * Role of this method is to garbage collect(mark-sweep) the task store.
    * @param taskStoreDir store directory of the task to perform garbage 
collection.
    *
-   *  This method cleans up each of the task store directory in two phases.
+   * This method cleans up each of the task store directory in two phases.
    *
    *  Phase 1:
    *  Delete the offset file in the task store if (curTime - 
lastModifiedTimeOfOffsetFile) > offsetTTL.
@@ -143,8 +144,13 @@ public class LocalStoreMonitor implements Monitor {
    *  Phase 2:
    *  Delete the task store directory if the offsetFile does not exist in task 
store directory.
    *
-   *  Time interval between the two phases is controlled by this monitor 
scheduling
-   *  interval in milli seconds.
+   * The separate phases are a safety precaution to prevent deleting a store 
that is currently being used.
+   * A running task will recreate the deleted offset file on the next commit. 
If a task is not running or
+   * running on a different host and gets moved to this host, it will not use 
a store without the offset file.
+   *
+   * Time interval between the two phases is controlled by this monitor 
scheduling
+   * interval in milli seconds.
+   *
    * @throws IOException if there is an exception during the clean up of the 
task store files.
    */
   private void markSweepTaskStore(File taskStoreDir) throws IOException {
@@ -158,7 +164,7 @@ public class LocalStoreMonitor implements Monitor {
       localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.inc();
     } else if ((CLOCK.currentTimeMillis() - offsetFile.lastModified()) >= 
config.getOffsetFileTTL()) {
       LOG.info("Deleting the offset file from the store: {}, since the last 
modified timestamp: {} "
-                   + "of the offset file is older than config file ttl: {}.",
+                   + "is older than the configured ttl: {}.",
                   taskStorePath, offsetFile.lastModified(), 
config.getOffsetFileTTL());
       offsetFile.delete();
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/e827d150/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java 
b/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
index 94e8370..bb3c46c 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
@@ -134,4 +134,9 @@ public class Task {
     result = 31 * result + storeNames.hashCode();
     return result;
   }
+
+  @Override
+  public String toString() {
+    return String.format("taskName:%s container:%s preferredHost:%s 
stores:%s", taskName, containerId, preferredHost, storeNames.toString());
+  }
 }

Reply via email to