Fix log messages from StreamProcessor(onJobModelExpired event). Log messages published in onJobModelExpired event have `processorId` as null. `processorId` is cached as final var in jobCoordinatorListener method. JLS for final fields/variables states that they're initialized before the constructor. This sets local final variable copy as null(since it relies upon value of instance variable to be set in constructor). Changes * Use processorId directly in `createCoordinatorListener` method. * Remove StreamProcessor.toString since it has no usages.
Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Reviewers: Navina Ramesh <nav...@apache.org> Closes #249 from shanthoosh/fix_logging_in_stream_processor Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/91b22fd7 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/91b22fd7 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/91b22fd7 Branch: refs/heads/0.14.0 Commit: 91b22fd773e9e24b77a1f29bde2ba4fa64e5a82a Parents: 1c11393 Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Authored: Tue Jul 25 10:35:31 2017 -0700 Committer: navina <nav...@apache.org> Committed: Tue Jul 25 10:35:31 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/samza/processor/StreamProcessor.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/91b22fd7/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 89edd16..590fa11 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -111,11 +111,6 @@ public class StreamProcessor { this(config, customMetricsReporters, (Object) streamTaskFactory, processorListener, null); } - @Override - public String toString() { - return "Processor:" + processorId; - } - /* package private */ JobCoordinator getJobCoordinator() { return Util. @@ -210,7 +205,6 @@ public class StreamProcessor { } JobCoordinatorListener createJobCoordinatorListener() { - final String pid = this.toString(); return new JobCoordinatorListener() { @Override @@ -220,7 +214,7 @@ public class StreamProcessor { if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) { boolean shutdownComplete = false; try { - LOGGER.info("Shutting down container in onJobModelExpired for processor:" + pid); + LOGGER.info("Shutting down container in onJobModelExpired for processor:" + processorId); container.pause(); shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS); LOGGER.info("ShutdownComplete=" + shutdownComplete); @@ -231,7 +225,7 @@ public class StreamProcessor { } catch (InterruptedException e) { LOGGER.warn("Container shutdown was interrupted!" + container.toString(), e); } - LOGGER.info("Shutting down container done for pid=" + pid + "; complete =" + shutdownComplete); + LOGGER.info("Shutting down container done for pid=" + processorId + "; complete =" + shutdownComplete); if (!shutdownComplete) { LOGGER.warn("Container " + container.toString() + " may not have shutdown successfully. " + "Stopping the processor.");