Repository: reef
Updated Branches:
  refs/heads/master f7f08b9f2 -> af63bf639


[REEF-1545] Refactor HandlerContainer to improve safety, readability, and 
performance

  * Use typed subscription handlers via the new Unsubscriber mechanism instead
    of type erasure
  * Use Map.put() instead of the combination of .putIfAbsent() and .replace()
    in handler registration
  * Deprecate the Subscription class and HandlerContainer.unsubscribe() method
    in favor of the new typed subscriptions.
  * Fix the code layout and add Javadocs to classes and methods.

JIRA:
  [REEF-1545](https://issues.apache.org/jira/browse/REEF-1545)

Pull Request:
  This closes #1108


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/af63bf63
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/af63bf63
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/af63bf63

Branch: refs/heads/master
Commit: af63bf639755bbd17ca8e0439b18471e19f6e171
Parents: f7f08b9
Author: Sergiy Matusevych <[email protected]>
Authored: Fri Aug 12 14:07:04 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Aug 26 17:15:47 2016 -0700

----------------------------------------------------------------------
 .../reef/wake/remote/impl/HandlerContainer.java | 94 +++++++++++++++-----
 .../reef/wake/remote/impl/Subscription.java     |  2 +
 .../wake/remote/impl/SubscriptionHandler.java   | 60 +++++++++++++
 3 files changed, 134 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/af63bf63/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
index 46c943d..d972d03 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
@@ -33,6 +33,8 @@ import java.util.logging.Logger;
 
 /**
  * Main logic to dispatch messages.
+ * An event handler that receives a remote message with a binary payload,
+ * decodes a message from the blob, and dispatches that message to a proper 
handler.
  */
 final class HandlerContainer<T> implements EventHandler<RemoteEvent<byte[]>> {
 
@@ -41,10 +43,12 @@ final class HandlerContainer<T> implements 
EventHandler<RemoteEvent<byte[]>> {
   private final ConcurrentMap<Class<? extends T>,
       EventHandler<RemoteMessage<? extends T>>> msgTypeToHandlerMap = new 
ConcurrentHashMap<>();
 
-  private final ConcurrentMap<Tuple2<RemoteIdentifier,
-      Class<? extends T>>, EventHandler<? extends T>> tupleToHandlerMap = new 
ConcurrentHashMap<>();
+  private final ConcurrentMap<Tuple2<RemoteIdentifier, Class<? extends T>>,
+      EventHandler<? extends T>> tupleToHandlerMap = new ConcurrentHashMap<>();
+
   private final Codec<T> codec;
   private final String name;
+
   private Transport transport;
 
   HandlerContainer(final String name, final Codec<T> codec) {
@@ -56,51 +60,66 @@ final class HandlerContainer<T> implements 
EventHandler<RemoteEvent<byte[]>> {
     this.transport = transport;
   }
 
+  /**
+   * Subscribe for events from a given source and message type.
+   * @param sourceIdentifier An identifier of an event source.
+   * @param messageType Java class of messages to dispatch.
+   * @param theHandler Message handler.
+   * @return A new subscription object that will cancel its subscription on 
.close()
+   */
   @SuppressWarnings("checkstyle:diamondoperatorforvariabledefinition")
-  public AutoCloseable registerHandler(final RemoteIdentifier sourceIdentifier,
-                                       final Class<? extends T> messageType,
-                                       final EventHandler<? extends T> 
theHandler) {
-
+  public AutoCloseable registerHandler(
+      final RemoteIdentifier sourceIdentifier,
+      final Class<? extends T> messageType,
+      final EventHandler<? extends T> theHandler) {
 
     final Tuple2<RemoteIdentifier, Class<? extends T>> tuple =
         new Tuple2<RemoteIdentifier, Class<? extends T>>(sourceIdentifier, 
messageType);
 
-    final EventHandler<? extends T> prevHandler =
-        this.tupleToHandlerMap.putIfAbsent(tuple, theHandler);
+    this.tupleToHandlerMap.put(tuple, theHandler);
 
-    if (prevHandler != null) {
-      this.tupleToHandlerMap.replace(tuple, theHandler);
-    }
+    LOG.log(Level.FINER,
+        "Add handler for tuple: {0},{1}",
+        new Object[] {tuple.getT1(), tuple.getT2().getName()});
 
-    LOG.log(Level.FINER, "{0}", tuple);
-    return new Subscription(tuple, this);
+    return new SubscriptionHandler<>(tuple, this.unsubscribeTuple);
   }
 
+  /**
+   * Subscribe for events of a given message type.
+   * @param messageType Java class of messages to dispatch.
+   * @param theHandler Message handler.
+   * @return A new subscription object that will cancel its subscription on 
.close()
+   */
   public AutoCloseable registerHandler(
       final Class<? extends T> messageType,
       final EventHandler<RemoteMessage<? extends T>> theHandler) {
 
-    final EventHandler<RemoteMessage<? extends T>> prevHandler =
-        this.msgTypeToHandlerMap.putIfAbsent(messageType, theHandler);
+    this.msgTypeToHandlerMap.put(messageType, theHandler);
 
-    if (prevHandler != null) {
-      this.msgTypeToHandlerMap.replace(messageType, theHandler);
-    }
+    LOG.log(Level.FINER, "Add handler for class: {0}", messageType.getName());
 
-    LOG.log(Level.FINER, "{0}", messageType);
-    return new Subscription(messageType, this);
+    return new SubscriptionHandler<>(messageType, this.unsubscribeClass);
   }
 
+  /**
+   * Specify handler for error messages.
+   * @param theHandler Error handler.
+   * @return A new subscription object that will cancel its subscription on 
.close()
+   */
   public AutoCloseable registerErrorHandler(final EventHandler<Exception> 
theHandler) {
     this.transport.registerErrorHandler(theHandler);
-    return new Subscription(new Exception("Token for finding the error handler 
subscription"), this);
+    return new SubscriptionHandler<>(
+        new Exception("Token for finding the error handler subscription"), 
this.unsubscribeException);
   }
 
   /**
    * Unsubscribes a handler.
    *
    * @param subscription
-   * @throws org.apache.reef.wake.remote.exception.RemoteRuntimeException if 
the Subscription type is unknown
+   * @throws org.apache.reef.wake.remote.exception.RemoteRuntimeException if 
the Subscription type is unknown.
+   * @deprecated [REEF-1544] Prefer using SubscriptionHandler and the 
corresponding methods
+   * instead of the old Subscription class. Remove method after release 0.16.
    */
   public void unsubscribe(final Subscription<T> subscription) {
     final T token = subscription.getToken();
@@ -117,6 +136,37 @@ final class HandlerContainer<T> implements 
EventHandler<RemoteEvent<byte[]>> {
     }
   }
 
+  /** Unsubscribe from messages of a given class. */
+  private final SubscriptionHandler.Unsubscriber<Class<? extends T>>
+      unsubscribeClass = new SubscriptionHandler.Unsubscriber<Class<? extends 
T>>() {
+        @Override
+        public void unsubscribe(final Class<? extends T> token) {
+          LOG.log(Level.FINER, "Unsubscribe: {0} class {1}", new Object[] 
{name, token.getName()});
+          msgTypeToHandlerMap.remove(token);
+        }
+      };
+
+  /** Unsubscribe from event from a certain source and message type. */
+  private final SubscriptionHandler.Unsubscriber<Tuple2<RemoteIdentifier, 
Class<? extends T>>>
+      unsubscribeTuple = new 
SubscriptionHandler.Unsubscriber<Tuple2<RemoteIdentifier, Class<? extends 
T>>>() {
+        @Override
+        public void unsubscribe(final Tuple2<RemoteIdentifier, Class<? extends 
T>> token) {
+          LOG.log(Level.FINER, "Unsubscribe: {0} tuple {1},{2}",
+              new Object[] {name, token.getT1(), token.getT2().getName()});
+          tupleToHandlerMap.remove(token);
+        }
+      };
+
+  /** Unsubscribe from error messages. */
+  private final SubscriptionHandler.Unsubscriber<Exception>
+      unsubscribeException = new SubscriptionHandler.Unsubscriber<Exception>() 
{
+        @Override
+        public void unsubscribe(final Exception token) {
+          LOG.log(Level.FINER, "Unsubscribe: {0} exception {1}", new Object[] 
{name, token});
+          transport.registerErrorHandler(null);
+        }
+      };
+
   /**
    * Dispatches a message.
    *

http://git-wip-us.apache.org/repos/asf/reef/blob/af63bf63/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java
index 5ae498a..0448408 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java
@@ -22,6 +22,8 @@ package org.apache.reef.wake.remote.impl;
  * Subscription of a handler.
  *
  * @param <T> type
+ * @deprecated [REEF-1544] Prefer using SubscriptionHandler and the 
corresponding handlers
+ * instead of the Subscription class. Remove class after release 0.16.
  */
 public class Subscription<T> implements AutoCloseable {
 

http://git-wip-us.apache.org/repos/asf/reef/blob/af63bf63/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SubscriptionHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SubscriptionHandler.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SubscriptionHandler.java
new file mode 100644
index 0000000..8f2eecf
--- /dev/null
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SubscriptionHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.impl;
+
+/**
+ * Subscription of a handler.
+ * @param <T> type type of the token that is used to identify the subscription.
+ */
+class SubscriptionHandler<T> implements AutoCloseable {
+
+  interface Unsubscriber<T> {
+    void unsubscribe(final T token);
+  }
+
+  private final T token;
+  private final Unsubscriber<T> unsubscriber;
+
+  /**
+   * Constructs a subscription.
+   * @param token Token for finding the subscription.
+   * @param unsubscriber unsubscribe method of the the container that manages 
handlers.
+   */
+  SubscriptionHandler(final T token, final Unsubscriber<T> unsubscriber) {
+    this.token = token;
+    this.unsubscriber = unsubscriber;
+  }
+
+  /**
+   * Gets the token of this subscription.
+   * @return the token of this subscription.
+   */
+  public T getToken() {
+    return this.token;
+  }
+
+  /**
+   * Cancels this subscription.
+   * @throws Exception if cannot unsubscribe.
+   */
+  @Override
+  public void close() throws Exception {
+    this.unsubscriber.unsubscribe(this.token);
+  }
+}

Reply via email to