Fix log messages from StreamProcessor(onJobModelExpired event).

Log messages published in onJobModelExpired event have `processorId` as null. 
`processorId` is cached as final var in jobCoordinatorListener method. JLS for 
final fields/variables states that they're initialized before the constructor. 
This sets local final variable copy as null(since it relies upon value of 
instance variable to be set in constructor).
Changes
* Use processorId directly in `createCoordinatorListener` method.
* Remove StreamProcessor.toString since it has no usages.

Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>

Reviewers: Navina Ramesh <nav...@apache.org>

Closes #249 from shanthoosh/fix_logging_in_stream_processor


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/91b22fd7
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/91b22fd7
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/91b22fd7

Branch: refs/heads/0.14.0
Commit: 91b22fd773e9e24b77a1f29bde2ba4fa64e5a82a
Parents: 1c11393
Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>
Authored: Tue Jul 25 10:35:31 2017 -0700
Committer: navina <nav...@apache.org>
Committed: Tue Jul 25 10:35:31 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/samza/processor/StreamProcessor.java  | 10 ++--------
 1 file changed, 2 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/91b22fd7/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 89edd16..590fa11 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -111,11 +111,6 @@ public class StreamProcessor {
     this(config, customMetricsReporters, (Object) streamTaskFactory, 
processorListener, null);
   }
 
-  @Override
-  public String toString() {
-    return "Processor:" + processorId;
-  }
-
   /* package private */
   JobCoordinator getJobCoordinator() {
     return Util.
@@ -210,7 +205,6 @@ public class StreamProcessor {
   }
 
   JobCoordinatorListener createJobCoordinatorListener() {
-    final String pid = this.toString();
     return new JobCoordinatorListener() {
 
       @Override
@@ -220,7 +214,7 @@ public class StreamProcessor {
           if (SamzaContainerStatus.NOT_STARTED.equals(status) || 
SamzaContainerStatus.STARTED.equals(status)) {
             boolean shutdownComplete = false;
             try {
-              LOGGER.info("Shutting down container in onJobModelExpired for 
processor:" + pid);
+              LOGGER.info("Shutting down container in onJobModelExpired for 
processor:" + processorId);
               container.pause();
               shutdownComplete = 
jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
               LOGGER.info("ShutdownComplete=" + shutdownComplete);
@@ -231,7 +225,7 @@ public class StreamProcessor {
             } catch (InterruptedException e) {
               LOGGER.warn("Container shutdown was interrupted!" + 
container.toString(), e);
             }
-            LOGGER.info("Shutting down container done for pid=" + pid + "; 
complete =" + shutdownComplete);
+            LOGGER.info("Shutting down container done for pid=" + processorId 
+ "; complete =" + shutdownComplete);
             if (!shutdownComplete) {
               LOGGER.warn("Container " + container.toString() + " may not have 
shutdown successfully. " +
                   "Stopping the processor.");

Reply via email to