Repository: reef
Updated Branches:
  refs/heads/master 73c28280c -> 5f4591083


[REEF-1657] Better thread names in REEF and Wake

This work is towards cleaner shutdown of REEF application required for "REEF as
a library" [REEF-1561](https://issues.apache.org/jira/browse/REEF-1561) project

There are no functionality changes in this PR.

Summary of changes:
  * More descriptive names for threads created in Wake
    `OrderedRemoteReceiverStage` and `DefaultThreadFactory`
  * Better thread names in REEF `ProcessContainer` and
  `EvaluatorMessageDispatcher`
  * Better logging in `REEFErrorHandler` and in many `.close()` methods in REEF
  * Minor refactoring in shutdown code; no functionality changes

JIRA:
  [REEF-1657](https://issues.apache.org/jira/browse/REEF-1657)

Pull Request:
  This closes #1172


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/5f459108
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/5f459108
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/5f459108

Branch: refs/heads/master
Commit: 5f4591083b3ee74eacc70770193913e6972812b2
Parents: 73c2828
Author: Sergiy Matusevych <[email protected]>
Authored: Fri Oct 28 00:35:38 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Oct 31 13:01:30 2016 -0700

----------------------------------------------------------------------
 .../evaluator/EvaluatorMessageDispatcher.java   |  20 ++--
 .../runtime/common/launch/REEFErrorHandler.java |  24 ++--
 .../runtime/common/utils/RemoteManager.java     |  11 +-
 .../runtime/local/driver/ProcessContainer.java  |  18 +--
 .../reef/wake/impl/DefaultThreadFactory.java    |  23 ++--
 .../DefaultRemoteManagerImplementation.java     | 109 ++++++++++---------
 .../remote/impl/OrderedRemoteReceiverStage.java |  69 +++++-------
 7 files changed, 148 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/5f459108/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
index fc3a83b..18e868a 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
@@ -45,6 +45,8 @@ public final class EvaluatorMessageDispatcher implements 
AutoCloseable {
 
   private static final Logger LOG = 
Logger.getLogger(EvaluatorMessageDispatcher.class.getName());
 
+  private final String evaluatorIdentifier;
+
   /**
    * Dispatcher used for application provided event handlers.
    */
@@ -55,7 +57,6 @@ public final class EvaluatorMessageDispatcher implements 
AutoCloseable {
    */
   private final DispatchingEStage serviceDispatcher;
 
-
   /**
    * Dispatcher used for application provided driver-restart specific event 
handlers.
    */
@@ -67,7 +68,7 @@ public final class EvaluatorMessageDispatcher implements 
AutoCloseable {
   private final DispatchingEStage driverRestartServiceDispatcher;
 
   @Inject
-  EvaluatorMessageDispatcher(
+  private EvaluatorMessageDispatcher(
       // Application-provided Context event handlers
       @Parameter(ContextActiveHandlers.class) final 
Set<EventHandler<ActiveContext>> contextActiveHandlers,
       @Parameter(ContextClosedHandlers.class) final 
Set<EventHandler<ClosedContext>> contextClosedHandlers,
@@ -129,10 +130,14 @@ public final class EvaluatorMessageDispatcher implements 
AutoCloseable {
       @Parameter(EvaluatorDispatcherThreads.class) final int numberOfThreads,
       @Parameter(EvaluatorManager.EvaluatorIdentifier.class) final String 
evaluatorIdentifier,
       final DriverExceptionHandler driverExceptionHandler,
-      final IdlenessCallbackEventHandlerFactory 
idlenessCallbackEventHandlerFactory
-  ) {
+      final IdlenessCallbackEventHandlerFactory 
idlenessCallbackEventHandlerFactory) {
+
+    LOG.log(Level.FINER, "Creating message dispatcher for {0}", 
evaluatorIdentifier);
+
+    this.evaluatorIdentifier = evaluatorIdentifier;
+    this.serviceDispatcher = new DispatchingEStage(
+        driverExceptionHandler, numberOfThreads, "EvaluatorMessageDispatcher:" 
+ evaluatorIdentifier);
 
-    this.serviceDispatcher = new DispatchingEStage(driverExceptionHandler, 
numberOfThreads, evaluatorIdentifier);
     this.applicationDispatcher = new DispatchingEStage(this.serviceDispatcher);
     this.driverRestartApplicationDispatcher = new 
DispatchingEStage(this.serviceDispatcher);
     this.driverRestartServiceDispatcher = new 
DispatchingEStage(this.serviceDispatcher);
@@ -282,9 +287,8 @@ public final class EvaluatorMessageDispatcher implements 
AutoCloseable {
 
   @Override
   public void close() throws Exception {
-    /**
-     * This effectively closes all dispatchers as they share the same stage.
-     */
+    LOG.log(Level.FINER, "Closing message dispatcher for {0}", 
this.evaluatorIdentifier);
+    // This effectively closes all dispatchers as they share the same stage.
     this.serviceDispatcher.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/5f459108/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java
index e32537c..0611919 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java
@@ -39,6 +39,7 @@ import java.util.logging.Logger;
 public final class REEFErrorHandler implements EventHandler<Throwable>, 
AutoCloseable {
 
   private static final Logger LOG = 
Logger.getLogger(REEFErrorHandler.class.getName());
+  private static final String CLASS_NAME = 
REEFErrorHandler.class.getCanonicalName();
 
   // This class is used as the ErrorHandler in the RemoteManager. Hence, we 
need an InjectionFuture here.
   private final InjectionFuture<RemoteManager> remoteManager;
@@ -47,10 +48,12 @@ public final class REEFErrorHandler implements 
EventHandler<Throwable>, AutoClos
   private final ExceptionCodec exceptionCodec;
 
   @Inject
-  REEFErrorHandler(final InjectionFuture<RemoteManager> remoteManager,
-                   @Parameter(ErrorHandlerRID.class) final String 
errorHandlerRID,
-                   @Parameter(LaunchID.class) final String launchID,
-                   final ExceptionCodec exceptionCodec) {
+  REEFErrorHandler(
+      @Parameter(ErrorHandlerRID.class) final String errorHandlerRID,
+      @Parameter(LaunchID.class) final String launchID,
+      final InjectionFuture<RemoteManager> remoteManager,
+      final ExceptionCodec exceptionCodec) {
+
     this.errorHandlerRID = errorHandlerRID;
     this.remoteManager = remoteManager;
     this.launchID = launchID;
@@ -91,19 +94,22 @@ public final class REEFErrorHandler implements 
EventHandler<Throwable>, AutoClos
 
   @SuppressWarnings("checkstyle:illegalcatch")
   public void close() {
+
+    LOG.entering(CLASS_NAME, "close");
+
     try {
       this.remoteManager.get().close();
     } catch (final Throwable ex) {
       LOG.log(Level.SEVERE, "Unable to close the remote manager", ex);
     }
+
+    LOG.exiting(CLASS_NAME, "close");
   }
 
   @Override
   public String toString() {
-    return "REEFErrorHandler{" +
-        "remoteManager=" + remoteManager +
-        ", launchID='" + launchID + '\'' +
-        ", errorHandlerRID='" + errorHandlerRID + '\'' +
-        '}';
+    return String.format(
+        "REEFErrorHandler: { remoteManager:{%s}, launchID:%s, 
errorHandlerRID:%s }",
+        this.remoteManager.get(), this.launchID, this.errorHandlerRID);
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/5f459108/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java
index 0eeff08..34fb065 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java
@@ -32,6 +32,7 @@ import java.util.logging.Logger;
 public class RemoteManager {
 
   private static final Logger LOG = 
Logger.getLogger(RemoteManager.class.getName());
+  private static final String CLASS_NAME = 
RemoteManager.class.getCanonicalName();
 
   private final org.apache.reef.wake.remote.RemoteManager raw;
   private final RemoteIdentifierFactory factory;
@@ -41,7 +42,7 @@ public class RemoteManager {
                        final RemoteIdentifierFactory factory) {
     this.raw = raw;
     this.factory = factory;
-    LOG.log(Level.FINE, "Instantiated 'RemoteManager' with remoteId: {0}", 
this.getMyIdentifier());
+    LOG.log(Level.FINE, "Instantiated RemoteManager wrapper: {0}", this.raw);
   }
 
   public final org.apache.reef.wake.remote.RemoteManager raw() {
@@ -49,7 +50,9 @@ public class RemoteManager {
   }
 
   public void close() throws Exception {
+    LOG.entering(CLASS_NAME, "close");
     this.raw.close();
+    LOG.exiting(CLASS_NAME, "close");
   }
 
   public <T> EventHandler<T> getHandler(
@@ -71,5 +74,9 @@ public class RemoteManager {
   public String getMyIdentifier() {
     return this.raw.getMyIdentifier().toString();
   }
-}
 
+  @Override
+  public String toString() {
+    return "RemoteManager wrap: " + this.raw;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5f459108/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java
index 160de05..6805141 100644
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java
@@ -55,6 +55,7 @@ public final class ProcessContainer implements Container {
   private final File globalFolder;
   private final RunnableProcessObserver processObserver;
   private final ThreadGroup threadGroup;
+
   private Thread theThread;
   private RunnableProcess process;
 
@@ -134,13 +135,16 @@ public final class ProcessContainer implements Container {
 
   @Override
   public void run(final List<String> commandLine) {
-    this.process = new RunnableProcess(commandLine,
+
+    this.process = new RunnableProcess(
+        commandLine,
         this.containedID,
         this.folder,
         this.processObserver,
         this.fileNames.getEvaluatorStdoutFileName(),
         this.fileNames.getEvaluatorStderrFileName());
-    this.theThread = new Thread(this.threadGroup, this.process, 
this.containedID);
+
+    this.theThread = new Thread(this.threadGroup, this.process, 
"ProcessContainer:" + this.containedID);
     this.theThread.start();
   }
 
@@ -189,12 +193,8 @@ public final class ProcessContainer implements Container {
 
   @Override
   public String toString() {
-    return "ProcessContainer{" +
-        "containedID='" + containedID + '\'' +
-        ", nodeID='" + nodeID + '\'' +
-        ", errorHandlerRID='" + errorHandlerRID + '\'' +
-        ", folder=" + folder + '\'' +
-        ", rack=" + rackName +
-        '}';
+    return String.format(
+        "ProcessContainer{containedID=%s, nodeID=%s, errorHandlerRID=%s, 
folder=%s, rack=%s}",
+        this.containedID, this.nodeID, this.errorHandlerRID, this.folder, 
this.rackName);
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/5f459108/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java
index 53c6efb..1612c39 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java
@@ -25,10 +25,13 @@ import java.util.concurrent.atomic.AtomicInteger;
  * A default thread factory implementation that names created threads.
  */
 public final class DefaultThreadFactory implements ThreadFactory {
+
   private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
+
   private final ThreadGroup group;
   private final AtomicInteger threadNumber = new AtomicInteger(1);
   private final String prefix;
+
   private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
 
   /**
@@ -37,10 +40,7 @@ public final class DefaultThreadFactory implements 
ThreadFactory {
    * @param prefix the name prefix of the created thread
    */
   public DefaultThreadFactory(final String prefix) {
-    final SecurityManager s = System.getSecurityManager();
-    this.group = (s != null) ? s.getThreadGroup() : 
Thread.currentThread().getThreadGroup();
-    this.prefix = prefix + "-pool-" + POOL_NUMBER.getAndIncrement() + 
"-thread-";
-    this.uncaughtExceptionHandler = null;
+    this(prefix, null);
   }
 
   /**
@@ -52,7 +52,7 @@ public final class DefaultThreadFactory implements 
ThreadFactory {
   public DefaultThreadFactory(final String prefix, final 
Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
     final SecurityManager s = System.getSecurityManager();
     this.group = (s != null) ? s.getThreadGroup() : 
Thread.currentThread().getThreadGroup();
-    this.prefix = prefix + "-pool-" + POOL_NUMBER.getAndIncrement() + 
"-thread-";
+    this.prefix = String.format("%s:pool-%02d", prefix, 
POOL_NUMBER.getAndIncrement());
     this.uncaughtExceptionHandler = uncaughtExceptionHandler;
   }
 
@@ -72,17 +72,22 @@ public final class DefaultThreadFactory implements 
ThreadFactory {
    */
   @Override
   public Thread newThread(final Runnable r) {
-    final Thread t = new Thread(group, r, prefix + 
threadNumber.getAndIncrement(), 0);
+
+    final Thread t = new Thread(this.group, r,
+        String.format("%s:thread-%03d", this.prefix, 
this.threadNumber.getAndIncrement()), 0);
+
     if (t.isDaemon()) {
       t.setDaemon(false);
     }
+
     if (t.getPriority() != Thread.NORM_PRIORITY) {
       t.setPriority(Thread.NORM_PRIORITY);
     }
-    if (uncaughtExceptionHandler != null) {
-      t.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+
+    if (this.uncaughtExceptionHandler != null) {
+      t.setUncaughtExceptionHandler(this.uncaughtExceptionHandler);
     }
+
     return t;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/5f459108/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
index 6012ba1..243fa31 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
@@ -167,64 +167,75 @@ public final class DefaultRemoteManagerImplementation 
implements RemoteManager {
   @Override
   public void close() {
 
-    if (this.closed.compareAndSet(false, true)) {
-
-      LOG.log(Level.FINE, "RemoteManager: {0} Closing remote manager id: {1}",
-          new Object[] {this.name, this.myIdentifier});
-
-      final Runnable closeRunnable = new Runnable() {
-        @Override
-        public void run() {
-          try {
-            LOG.log(Level.FINE, "Closing sender stage {0}", myIdentifier);
-            reSendStage.close();
-            LOG.log(Level.FINE, "Closed the remote sender stage");
-          } catch (final Exception e) {
-            LOG.log(Level.SEVERE, "Unable to close the remote sender stage", 
e);
-          }
-
-          try {
-            LOG.log(Level.FINE, "Closing transport {0}", myIdentifier);
-            transport.close();
-            LOG.log(Level.FINE, "Closed the transport");
-          } catch (final Exception e) {
-            LOG.log(Level.SEVERE, "Unable to close the transport.", e);
-          }
-
-          try {
-            LOG.log(Level.FINE, "Closing receiver stage {0}", myIdentifier);
-            reRecvStage.close();
-            LOG.log(Level.FINE, "Closed the remote receiver stage");
-          } catch (final Exception e) {
-            LOG.log(Level.SEVERE, "Unable to close the remote receiver stage", 
e);
-          }
-        }
-      };
+    LOG.log(Level.FINE, "RemoteManager: {0} Closing remote manager id: {1}",
+        new Object[] {this.name, this.myIdentifier});
 
-      final ExecutorService closeExecutor = 
Executors.newSingleThreadExecutor();
+    if (!this.closed.compareAndSet(false, true)) {
+      LOG.log(Level.FINE, "RemoteManager: {0} already closed", this.name);
+      return;
+    }
 
-      closeExecutor.submit(closeRunnable);
-      closeExecutor.shutdown();
+    final Runnable closeRunnable = new Runnable() {
+      @Override
+      public void run() {
 
-      if (!closeExecutor.isShutdown()) {
-        LOG.log(Level.SEVERE, "close executor did not shutdown properly.");
-      }
+        
Thread.currentThread().setName(String.format("CLOSE:RemoteManager:%s:%s", name, 
myIdentifier));
+
+        try {
+          LOG.log(Level.FINE, "Closing sender stage {0}", myIdentifier);
+          reSendStage.close();
+          LOG.log(Level.FINE, "Closed the remote sender stage");
+        } catch (final Exception e) {
+          LOG.log(Level.SEVERE, "Unable to close the remote sender stage", e);
+        }
+
+        try {
+          LOG.log(Level.FINE, "Closing transport {0}", myIdentifier);
+          transport.close();
+          LOG.log(Level.FINE, "Closed the transport");
+        } catch (final Exception e) {
+          LOG.log(Level.SEVERE, "Unable to close the transport.", e);
+        }
 
-      final long endTime = System.currentTimeMillis() + CLOSE_EXECUTOR_TIMEOUT;
-      while (!closeExecutor.isTerminated()) {
         try {
-          final long waitTime = endTime - System.currentTimeMillis();
-          closeExecutor.awaitTermination(waitTime, TimeUnit.MILLISECONDS);
-        } catch (final InterruptedException e) {
-          LOG.log(Level.FINE, "Interrupted", e);
+          LOG.log(Level.FINE, "Closing receiver stage {0}", myIdentifier);
+          reRecvStage.close();
+          LOG.log(Level.FINE, "Closed the remote receiver stage");
+        } catch (final Exception e) {
+          LOG.log(Level.SEVERE, "Unable to close the remote receiver stage", 
e);
         }
       }
+    };
 
-      if (closeExecutor.isTerminated()) {
-        LOG.log(Level.FINE, "Close executor terminated properly.");
-      } else {
-        LOG.log(Level.SEVERE, "Close executor did not terminate properly.");
+    final ExecutorService closeExecutor = Executors.newSingleThreadExecutor();
+
+    closeExecutor.submit(closeRunnable);
+    closeExecutor.shutdown();
+
+    if (!closeExecutor.isShutdown()) {
+      LOG.log(Level.SEVERE, "close executor did not shutdown properly.");
+    }
+
+    final long endTime = System.currentTimeMillis() + CLOSE_EXECUTOR_TIMEOUT;
+    while (!closeExecutor.isTerminated()) {
+      try {
+        final long waitTime = endTime - System.currentTimeMillis();
+        closeExecutor.awaitTermination(waitTime, TimeUnit.MILLISECONDS);
+      } catch (final InterruptedException e) {
+        LOG.log(Level.FINE, "Interrupted", e);
       }
     }
+
+    if (closeExecutor.isTerminated()) {
+      LOG.log(Level.FINE, "Close executor terminated properly.");
+    } else {
+      LOG.log(Level.SEVERE, "Close executor did not terminate properly.");
+    }
+  }
+
+  @Override
+  public String toString() {
+    return String.format("RemoteManager: { class:%s, name:%s, id:%s }",
+        this.getClass().getCanonicalName(), this.name, this.myIdentifier);
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/5f459108/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java
index 1d2d895..19a8162 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java
@@ -37,14 +37,15 @@ import java.util.logging.Logger;
 public class OrderedRemoteReceiverStage implements EStage<TransportEvent> {
 
   private static final Logger LOG = 
Logger.getLogger(OrderedRemoteReceiverStage.class.getName());
-  private final long shutdownTimeout = 
WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT;
 
-  private final ConcurrentMap<SocketAddress, OrderedEventStream> streamMap;
+  private static final String CLASS_NAME = 
OrderedRemoteReceiverStage.class.getSimpleName();
+
+  private static final long SHUTDOWN_TIMEOUT = 
WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT;
+
   private final ExecutorService pushExecutor;
   private final ExecutorService pullExecutor;
 
   private final ThreadPoolStage<TransportEvent> pushStage;
-  private final ThreadPoolStage<OrderedEventStream> pullStage;
 
   /**
    * Constructs an ordered remote receiver stage.
@@ -54,58 +55,46 @@ public class OrderedRemoteReceiverStage implements 
EStage<TransportEvent> {
    */
   public OrderedRemoteReceiverStage(
       final EventHandler<RemoteEvent<byte[]>> handler, final 
EventHandler<Throwable> errorHandler) {
-    this.streamMap = new ConcurrentHashMap<SocketAddress, 
OrderedEventStream>();
 
-    this.pushExecutor = Executors.newCachedThreadPool(
-        new DefaultThreadFactory(OrderedRemoteReceiverStage.class.getName() + 
"_Push"));
-    this.pullExecutor = Executors.newCachedThreadPool(
-        new DefaultThreadFactory(OrderedRemoteReceiverStage.class.getName() + 
"_Pull"));
+    this.pushExecutor = Executors.newCachedThreadPool(new 
DefaultThreadFactory(CLASS_NAME + ":Push"));
+    this.pullExecutor = Executors.newCachedThreadPool(new 
DefaultThreadFactory(CLASS_NAME + ":Pull"));
+
+    final ConcurrentMap<SocketAddress, OrderedEventStream> streamMap = new 
ConcurrentHashMap<>();
 
-    this.pullStage = new ThreadPoolStage<OrderedEventStream>(
+    final ThreadPoolStage<OrderedEventStream> pullStage = new 
ThreadPoolStage<>(
         new OrderedPullEventHandler(handler), this.pullExecutor, errorHandler);
-    this.pushStage = new ThreadPoolStage<TransportEvent>(
+
+    this.pushStage = new ThreadPoolStage<>(
         new OrderedPushEventHandler(streamMap, pullStage), this.pushExecutor, 
errorHandler); // for decoupling
   }
 
   @Override
   public void onNext(final TransportEvent value) {
-    LOG.log(Level.FINEST, "{0}", value);
-    pushStage.onNext(value);
+    LOG.log(Level.FINEST, "Push: {0}", value);
+    this.pushStage.onNext(value);
   }
 
   @Override
   public void close() throws Exception {
-    LOG.log(Level.FINE, "close");
-
-    if (pushExecutor != null) {
-      pushExecutor.shutdown();
-      try {
-        // wait for threads to finish for timeout
-        if (!pushExecutor.awaitTermination(shutdownTimeout, 
TimeUnit.MILLISECONDS)) {
-          LOG.log(Level.WARNING, "Executor did not terminate in " + 
shutdownTimeout + "ms.");
-          final List<Runnable> droppedRunnables = pushExecutor.shutdownNow();
-          LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() 
+ " tasks.");
-        }
-      } catch (final InterruptedException e) {
-        LOG.log(Level.WARNING, "Close interrupted");
-        throw new RemoteRuntimeException(e);
-      }
-    }
+    close("PushExecutor", this.pushExecutor);
+    close("PullExecutor", this.pullExecutor);
+  }
 
-    if (pullExecutor != null) {
-      pullExecutor.shutdown();
-      try {
-        // wait for threads to finish for timeout
-        if (!pullExecutor.awaitTermination(shutdownTimeout, 
TimeUnit.MILLISECONDS)) {
-          LOG.log(Level.WARNING, "Executor did not terminate in " + 
shutdownTimeout + "ms.");
-          final List<Runnable> droppedRunnables = pullExecutor.shutdownNow();
-          LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() 
+ " tasks.");
-        }
-      } catch (final InterruptedException e) {
-        LOG.log(Level.WARNING, "Close interrupted");
-        throw new RemoteRuntimeException(e);
+  private static void close(final String name, final ExecutorService executor) 
{
+    LOG.log(Level.FINE, "Close {0} begin", name);
+    executor.shutdown();
+    try {
+      // wait for threads to finish for timeout
+      if (!executor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) 
{
+        LOG.log(Level.WARNING, "{0}: Executor did not terminate in {1} ms.", 
new Object[] {name, SHUTDOWN_TIMEOUT});
+        final List<Runnable> droppedRunnables = executor.shutdownNow();
+        LOG.log(Level.WARNING, "{0}: Executor dropped {1} tasks.", new 
Object[] {name, droppedRunnables.size()});
       }
+    } catch (final InterruptedException e) {
+      LOG.log(Level.WARNING, "Close interrupted");
+      throw new RemoteRuntimeException(e);
     }
+    LOG.log(Level.FINE, "Close {0} end", name);
   }
 }
 

Reply via email to