Repository: samza
Updated Branches:
  refs/heads/master 3895a9070 -> 76de840c7


SAMZA-1584: Improve logging in StreamProcessor.

Add the processorID in the log lines wherever necessary(since we support 
running multiple stream applications in a JVM) and improving logging in general 
in StreamProcessor.

Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>
Author: Shanthoosh Venkataraman <svenk...@lm-lsnscdw5132.linkedin.biz>

Reviewers: Prateek Maheshwari <pmaheshw...@apache.org>

Closes #441 from shanthoosh/SAMZA-1584


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/76de840c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/76de840c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/76de840c

Branch: refs/heads/master
Commit: 76de840c734fe2b7987af22d1ba6133437c25a5e
Parents: 3895a90
Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>
Authored: Thu Apr 12 14:34:45 2018 -0700
Committer: Prateek Maheshwari <pmahe...@linkedin.com>
Committed: Thu Apr 12 14:34:45 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/processor/StreamProcessor.java | 54 ++++++++++----------
 1 file changed, 27 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/76de840c/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index b548200..8dacc6c 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.samza.SamzaContainerStatus;
 import org.apache.samza.annotation.InterfaceStability;
@@ -55,6 +56,7 @@ import org.slf4j.LoggerFactory;
 @InterfaceStability.Evolving
 public class StreamProcessor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamProcessor.class);
+  private static final String CONTAINER_THREAD_NAME_FORMAT = "Samza 
StreamProcessor Container Thread-%d";
 
   private final JobCoordinator jobCoordinator;
   private final StreamProcessorLifecycleListener processorListener;
@@ -71,7 +73,7 @@ public class StreamProcessor {
 
   // Latch used to synchronize between the JobCoordinator thread and the 
container thread, when the container is
   // stopped due to re-balancing
-  /* package private */volatile CountDownLatch jcContainerShutdownLatch;
+  volatile CountDownLatch jcContainerShutdownLatch;
   private volatile boolean processorOnStartCalled = false;
 
   @VisibleForTesting
@@ -179,11 +181,12 @@ public class StreamProcessor {
     boolean containerShutdownInvoked = false;
     if (container != null) {
       try {
-        LOGGER.info("Shutting down container " + container.toString() + " from 
StreamProcessor");
+        LOGGER.info("Shutting down the container: {} of stream processor: 
{}.", container, processorId);
         container.shutdown();
+        LOGGER.info("Waiting {} milliseconds for the container: {} to 
shutdown.", taskShutdownMs, container);
         containerShutdownInvoked = true;
-      } catch (IllegalContainerStateException icse) {
-        LOGGER.info("Container was not running", icse);
+      } catch (Exception exception) {
+        LOGGER.error(String.format("Ignoring the exception during the shutdown 
of container: %s.", container), exception);
       }
     }
 
@@ -191,7 +194,6 @@ public class StreamProcessor {
       LOGGER.info("Shutting down JobCoordinator from StreamProcessor");
       jobCoordinator.stop();
     }
-
   }
 
   SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) {
@@ -199,7 +201,7 @@ public class StreamProcessor {
         processorId,
         jobModel,
         config,
-        Util.<String, MetricsReporter>javaMapAsScalaMap(customMetricsReporter),
+        Util.javaMapAsScalaMap(customMetricsReporter),
         taskFactory);
   }
 
@@ -213,32 +215,30 @@ public class StreamProcessor {
           if (SamzaContainerStatus.NOT_STARTED.equals(status) || 
SamzaContainerStatus.STARTED.equals(status)) {
             boolean shutdownComplete = false;
             try {
-              LOGGER.info("Shutting down container in onJobModelExpired for 
processor:" + processorId);
+              LOGGER.info("Job model expired. Shutting down the container: {} 
of stream processor: {}.", container, processorId);
               container.pause();
               shutdownComplete = 
jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
-              LOGGER.info("ShutdownComplete=" + shutdownComplete);
+              LOGGER.info(String.format("Shutdown status of container: %s for 
stream processor: %s is: %s.", container, processorId, shutdownComplete));
             } catch (IllegalContainerStateException icse) {
               // Ignored since container is not running
-              LOGGER.info("Container was not running.", icse);
+              LOGGER.info(String.format("Cannot shutdown container: %s for 
stream processor: %s. Container is not running.", container, processorId), 
icse);
               shutdownComplete = true;
             } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
-              LOGGER.warn("Container shutdown was interrupted!" + 
container.toString(), e);
+              LOGGER.warn(String.format("Shutdown of container: %s for stream 
processor: %s was interrupted", container, processorId), e);
             }
-            LOGGER.info("Shutting down container done for pid=" + processorId 
+ "; complete =" + shutdownComplete);
             if (!shutdownComplete) {
-              LOGGER.warn("Container " + container.toString() + " may not have 
shutdown successfully. " +
-                  "Stopping the processor.");
+              LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping 
the stream processor: {}.", container, processorId);
               container = null;
               stop();
             } else {
-              LOGGER.debug("Container " + container.toString() + " shutdown 
successfully");
+              LOGGER.info("Container: {} shutdown completed for stream 
processor: {}.", container, processorId);
             }
           } else {
-            LOGGER.debug("Container " + container.toString() + " is not 
running.");
+            LOGGER.info("Container: {} of the stream processor: {} is not 
running.", container, processorId);
           }
         } else {
-          LOGGER.debug("Container is not instantiated yet.");
+          LOGGER.info("Container is not instantiated for stream processor: 
{}.", processorId);
         }
       }
 
@@ -257,19 +257,19 @@ public class StreamProcessor {
                 processorListener.onStart();
               }
             } else {
-              LOGGER.debug("StreamProcessorListener was notified of container 
start previously. Hence, skipping this time.");
+              LOGGER.warn("Received duplicate container start notification for 
container: {} in stream processor: {}.", container, processorId);
             }
           }
 
           @Override
           public void onContainerStop(boolean pauseByJm) {
             if (pauseByJm) {
-              LOGGER.info("Container " + container.toString() + " stopped due 
to a request from JobCoordinator.");
+              LOGGER.info("Container: {} of the stream processor: {} was 
stopped by the JobCoordinator.", container, processorId);
               if (jcContainerShutdownLatch != null) {
                 jcContainerShutdownLatch.countDown();
               }
             } else {  // sp.stop was called or container stopped by itself
-              LOGGER.info("Container " + container.toString() + " stopped.");
+              LOGGER.info("Container: {} stopped. Stopping the stream 
processor: {}.", container, processorId);
               container = null; // this guarantees that stop() doesn't try to 
stop container again
               stop();
             }
@@ -283,7 +283,7 @@ public class StreamProcessor {
               LOGGER.warn("JobCoordinatorLatch was null. It is possible for 
some component to be waiting.");
             }
             containerException = t;
-            LOGGER.error("Container failed. Stopping the processor.", 
containerException);
+            LOGGER.error(String.format("Container: %s failed with an 
exception. Stopping the stream processor: %s. Original exception:", container, 
processorId), containerException);
             container = null;
             stop();
           }
@@ -291,16 +291,16 @@ public class StreamProcessor {
 
         container = createSamzaContainer(processorId, jobModel);
         container.setContainerListener(containerListener);
-        LOGGER.info("Starting container " + container.toString());
-        executorService = Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder()
-            .setNameFormat("p-" + processorId + 
"-container-thread-%d").build());
+        LOGGER.info("Starting the container: {} for the stream processor: 
{}.", container, processorId);
+        ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).build();
+        executorService = Executors.newSingleThreadExecutor(threadFactory);
         executorService.submit(container::run);
       }
 
       @Override
       public void onCoordinatorStop() {
         if (executorService != null) {
-          LOGGER.info("Shutting down the executor service.");
+          LOGGER.info("Shutting down the executor service of the stream 
processor: {}.", processorId);
           executorService.shutdownNow();
         }
         if (processorListener != null) {
@@ -312,11 +312,11 @@ public class StreamProcessor {
       }
 
       @Override
-      public void onCoordinatorFailure(Throwable e) {
-        LOGGER.info("Coordinator Failed. Stopping the processor.");
+      public void onCoordinatorFailure(Throwable throwable) {
+        LOGGER.info(String.format("Coordinator: %s failed with an exception. 
Stopping the stream processor: %s. Original exception:", jobCoordinator, 
processorId), throwable);
         stop();
         if (processorListener != null) {
-          processorListener.onFailure(e);
+          processorListener.onFailure(throwable);
         }
       }
     };

Reply via email to