Repository: samza Updated Branches: refs/heads/master d1653aad0 -> e827d150f
SAMZA-1335: Improve logging for LocalStoreMonitor Author: Jacob Maes <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes #226 from jmakes/samza-1335 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e827d150 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e827d150 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e827d150 Branch: refs/heads/master Commit: e827d150fc12c81bb64788d28e07488c42a93687 Parents: d1653aa Author: Jacob Maes <[email protected]> Authored: Thu Jun 15 13:40:43 2017 -0700 Committer: Jacob Maes <[email protected]> Committed: Thu Jun 15 13:40:43 2017 -0700 ---------------------------------------------------------------------- .../apache/samza/monitor/LocalStoreMonitor.java | 20 +++++++++++++------- .../java/org/apache/samza/rest/model/Task.java | 5 +++++ 2 files changed, 18 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e827d150/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java index 8195491..8b25636 100644 --- a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java +++ b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java @@ -83,9 +83,10 @@ public class LocalStoreMonitor implements Monitor { String.format("%s-%s", jobInstance.getJobName(), jobInstance.getJobId())); try { JobStatus jobStatus = jobsClient.getJobStatus(jobInstance); + LOG.info("Job: {} has the status: {}.", jobInstance, jobStatus); for (Task task : jobsClient.getTasks(jobInstance)) { + LOG.info("Evaluating stores for task: {}", task); for (String storeName : jobDir.list(DirectoryFileFilter.DIRECTORY)) { - LOG.info("Job: {} has the running status: {} with preferred host: {}.", jobInstance, jobStatus, task.getPreferredHost()); /** * A task store is active if all of the following conditions are true: * a) If the store is amongst the active stores of the task. @@ -95,9 +96,9 @@ public class LocalStoreMonitor implements Monitor { if (jobStatus.hasBeenStarted() && task.getStoreNames().contains(storeName) && task.getPreferredHost().equals(localHostName)) { - LOG.info(String.format("Store %s is actively used by the task: %s.", storeName, task.getTaskName())); + LOG.info(String.format("Local store: %s is actively used by the task: %s.", storeName, task.getTaskName())); } else { - LOG.info(String.format("Store %s not used by the task: %s.", storeName, task.getTaskName())); + LOG.info(String.format("Local store: %s not used by the task: %s.", storeName, task.getTaskName())); markSweepTaskStore(TaskStorageManager.getStorePartitionDir(jobDir, storeName, new TaskName(task.getTaskName()))); } } @@ -135,7 +136,7 @@ public class LocalStoreMonitor implements Monitor { * Role of this method is to garbage collect(mark-sweep) the task store. * @param taskStoreDir store directory of the task to perform garbage collection. * - * This method cleans up each of the task store directory in two phases. + * This method cleans up each of the task store directory in two phases. * * Phase 1: * Delete the offset file in the task store if (curTime - lastModifiedTimeOfOffsetFile) > offsetTTL. @@ -143,8 +144,13 @@ public class LocalStoreMonitor implements Monitor { * Phase 2: * Delete the task store directory if the offsetFile does not exist in task store directory. * - * Time interval between the two phases is controlled by this monitor scheduling - * interval in milli seconds. + * The separate phases are a safety precaution to prevent deleting a store that is currently being used. + * A running task will recreate the deleted offset file on the next commit. If a task is not running or + * running on a different host and gets moved to this host, it will not use a store without the offset file. + * + * Time interval between the two phases is controlled by this monitor scheduling + * interval in milli seconds. + * * @throws IOException if there is an exception during the clean up of the task store files. */ private void markSweepTaskStore(File taskStoreDir) throws IOException { @@ -158,7 +164,7 @@ public class LocalStoreMonitor implements Monitor { localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.inc(); } else if ((CLOCK.currentTimeMillis() - offsetFile.lastModified()) >= config.getOffsetFileTTL()) { LOG.info("Deleting the offset file from the store: {}, since the last modified timestamp: {} " - + "of the offset file is older than config file ttl: {}.", + + "is older than the configured ttl: {}.", taskStorePath, offsetFile.lastModified(), config.getOffsetFileTTL()); offsetFile.delete(); } http://git-wip-us.apache.org/repos/asf/samza/blob/e827d150/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java b/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java index 94e8370..bb3c46c 100644 --- a/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java +++ b/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java @@ -134,4 +134,9 @@ public class Task { result = 31 * result + storeNames.hashCode(); return result; } + + @Override + public String toString() { + return String.format("taskName:%s container:%s preferredHost:%s stores:%s", taskName, containerId, preferredHost, storeNames.toString()); + } }
