Repository: reef
Updated Branches:
refs/heads/master f7f08b9f2 -> af63bf639
[REEF-1545] Refactor HandlerContainer to improve safety, readability, and
performance
* Use typed subscription handlers via the new Unsubscriber mechanism instead
of type erasure
* Use Map.put() instead of the combination of .putIfAbsent() and .replace()
in handler registration
* Deprecate the Subscription class and HandlerContainer.unsubscribe() method
in favor of the new typed subscriptions.
* Fix the code layout and add Javadocs to classes and methods.
JIRA:
[REEF-1545](https://issues.apache.org/jira/browse/REEF-1545)
Pull Request:
This closes #1108
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/af63bf63
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/af63bf63
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/af63bf63
Branch: refs/heads/master
Commit: af63bf639755bbd17ca8e0439b18471e19f6e171
Parents: f7f08b9
Author: Sergiy Matusevych <[email protected]>
Authored: Fri Aug 12 14:07:04 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Aug 26 17:15:47 2016 -0700
----------------------------------------------------------------------
.../reef/wake/remote/impl/HandlerContainer.java | 94 +++++++++++++++-----
.../reef/wake/remote/impl/Subscription.java | 2 +
.../wake/remote/impl/SubscriptionHandler.java | 60 +++++++++++++
3 files changed, 134 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/af63bf63/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
index 46c943d..d972d03 100644
---
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
+++
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
@@ -33,6 +33,8 @@ import java.util.logging.Logger;
/**
* Main logic to dispatch messages.
+ * An event handler that receives a remote message with a binary payload,
+ * decodes a message from the blob, and dispatches that message to a proper
handler.
*/
final class HandlerContainer<T> implements EventHandler<RemoteEvent<byte[]>> {
@@ -41,10 +43,12 @@ final class HandlerContainer<T> implements
EventHandler<RemoteEvent<byte[]>> {
private final ConcurrentMap<Class<? extends T>,
EventHandler<RemoteMessage<? extends T>>> msgTypeToHandlerMap = new
ConcurrentHashMap<>();
- private final ConcurrentMap<Tuple2<RemoteIdentifier,
- Class<? extends T>>, EventHandler<? extends T>> tupleToHandlerMap = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<Tuple2<RemoteIdentifier, Class<? extends T>>,
+ EventHandler<? extends T>> tupleToHandlerMap = new ConcurrentHashMap<>();
+
private final Codec<T> codec;
private final String name;
+
private Transport transport;
HandlerContainer(final String name, final Codec<T> codec) {
@@ -56,51 +60,66 @@ final class HandlerContainer<T> implements
EventHandler<RemoteEvent<byte[]>> {
this.transport = transport;
}
+ /**
+ * Subscribe for events from a given source and message type.
+ * @param sourceIdentifier An identifier of an event source.
+ * @param messageType Java class of messages to dispatch.
+ * @param theHandler Message handler.
+ * @return A new subscription object that will cancel its subscription on
.close()
+ */
@SuppressWarnings("checkstyle:diamondoperatorforvariabledefinition")
- public AutoCloseable registerHandler(final RemoteIdentifier sourceIdentifier,
- final Class<? extends T> messageType,
- final EventHandler<? extends T>
theHandler) {
-
+ public AutoCloseable registerHandler(
+ final RemoteIdentifier sourceIdentifier,
+ final Class<? extends T> messageType,
+ final EventHandler<? extends T> theHandler) {
final Tuple2<RemoteIdentifier, Class<? extends T>> tuple =
new Tuple2<RemoteIdentifier, Class<? extends T>>(sourceIdentifier,
messageType);
- final EventHandler<? extends T> prevHandler =
- this.tupleToHandlerMap.putIfAbsent(tuple, theHandler);
+ this.tupleToHandlerMap.put(tuple, theHandler);
- if (prevHandler != null) {
- this.tupleToHandlerMap.replace(tuple, theHandler);
- }
+ LOG.log(Level.FINER,
+ "Add handler for tuple: {0},{1}",
+ new Object[] {tuple.getT1(), tuple.getT2().getName()});
- LOG.log(Level.FINER, "{0}", tuple);
- return new Subscription(tuple, this);
+ return new SubscriptionHandler<>(tuple, this.unsubscribeTuple);
}
+ /**
+ * Subscribe for events of a given message type.
+ * @param messageType Java class of messages to dispatch.
+ * @param theHandler Message handler.
+ * @return A new subscription object that will cancel its subscription on
.close()
+ */
public AutoCloseable registerHandler(
final Class<? extends T> messageType,
final EventHandler<RemoteMessage<? extends T>> theHandler) {
- final EventHandler<RemoteMessage<? extends T>> prevHandler =
- this.msgTypeToHandlerMap.putIfAbsent(messageType, theHandler);
+ this.msgTypeToHandlerMap.put(messageType, theHandler);
- if (prevHandler != null) {
- this.msgTypeToHandlerMap.replace(messageType, theHandler);
- }
+ LOG.log(Level.FINER, "Add handler for class: {0}", messageType.getName());
- LOG.log(Level.FINER, "{0}", messageType);
- return new Subscription(messageType, this);
+ return new SubscriptionHandler<>(messageType, this.unsubscribeClass);
}
+ /**
+ * Specify handler for error messages.
+ * @param theHandler Error handler.
+ * @return A new subscription object that will cancel its subscription on
.close()
+ */
public AutoCloseable registerErrorHandler(final EventHandler<Exception>
theHandler) {
this.transport.registerErrorHandler(theHandler);
- return new Subscription(new Exception("Token for finding the error handler
subscription"), this);
+ return new SubscriptionHandler<>(
+ new Exception("Token for finding the error handler subscription"),
this.unsubscribeException);
}
/**
* Unsubscribes a handler.
*
* @param subscription
- * @throws org.apache.reef.wake.remote.exception.RemoteRuntimeException if
the Subscription type is unknown
+ * @throws org.apache.reef.wake.remote.exception.RemoteRuntimeException if
the Subscription type is unknown.
+ * @deprecated [REEF-1544] Prefer using SubscriptionHandler and the
corresponding methods
+ * instead of the old Subscription class. Remove method after release 0.16.
*/
public void unsubscribe(final Subscription<T> subscription) {
final T token = subscription.getToken();
@@ -117,6 +136,37 @@ final class HandlerContainer<T> implements
EventHandler<RemoteEvent<byte[]>> {
}
}
+ /** Unsubscribe from messages of a given class. */
+ private final SubscriptionHandler.Unsubscriber<Class<? extends T>>
+ unsubscribeClass = new SubscriptionHandler.Unsubscriber<Class<? extends
T>>() {
+ @Override
+ public void unsubscribe(final Class<? extends T> token) {
+ LOG.log(Level.FINER, "Unsubscribe: {0} class {1}", new Object[]
{name, token.getName()});
+ msgTypeToHandlerMap.remove(token);
+ }
+ };
+
+ /** Unsubscribe from event from a certain source and message type. */
+ private final SubscriptionHandler.Unsubscriber<Tuple2<RemoteIdentifier,
Class<? extends T>>>
+ unsubscribeTuple = new
SubscriptionHandler.Unsubscriber<Tuple2<RemoteIdentifier, Class<? extends
T>>>() {
+ @Override
+ public void unsubscribe(final Tuple2<RemoteIdentifier, Class<? extends
T>> token) {
+ LOG.log(Level.FINER, "Unsubscribe: {0} tuple {1},{2}",
+ new Object[] {name, token.getT1(), token.getT2().getName()});
+ tupleToHandlerMap.remove(token);
+ }
+ };
+
+ /** Unsubscribe from error messages. */
+ private final SubscriptionHandler.Unsubscriber<Exception>
+ unsubscribeException = new SubscriptionHandler.Unsubscriber<Exception>()
{
+ @Override
+ public void unsubscribe(final Exception token) {
+ LOG.log(Level.FINER, "Unsubscribe: {0} exception {1}", new Object[]
{name, token});
+ transport.registerErrorHandler(null);
+ }
+ };
+
/**
* Dispatches a message.
*
http://git-wip-us.apache.org/repos/asf/reef/blob/af63bf63/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java
index 5ae498a..0448408 100644
---
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java
+++
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java
@@ -22,6 +22,8 @@ package org.apache.reef.wake.remote.impl;
* Subscription of a handler.
*
* @param <T> type
+ * @deprecated [REEF-1544] Prefer using SubscriptionHandler and the
corresponding handlers
+ * instead of the Subscription class. Remove class after release 0.16.
*/
public class Subscription<T> implements AutoCloseable {
http://git-wip-us.apache.org/repos/asf/reef/blob/af63bf63/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SubscriptionHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SubscriptionHandler.java
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SubscriptionHandler.java
new file mode 100644
index 0000000..8f2eecf
--- /dev/null
+++
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SubscriptionHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.impl;
+
+/**
+ * Subscription of a handler.
+ * @param <T> type type of the token that is used to identify the subscription.
+ */
+class SubscriptionHandler<T> implements AutoCloseable {
+
+ interface Unsubscriber<T> {
+ void unsubscribe(final T token);
+ }
+
+ private final T token;
+ private final Unsubscriber<T> unsubscriber;
+
+ /**
+ * Constructs a subscription.
+ * @param token Token for finding the subscription.
+ * @param unsubscriber unsubscribe method of the the container that manages
handlers.
+ */
+ SubscriptionHandler(final T token, final Unsubscriber<T> unsubscriber) {
+ this.token = token;
+ this.unsubscriber = unsubscriber;
+ }
+
+ /**
+ * Gets the token of this subscription.
+ * @return the token of this subscription.
+ */
+ public T getToken() {
+ return this.token;
+ }
+
+ /**
+ * Cancels this subscription.
+ * @throws Exception if cannot unsubscribe.
+ */
+ @Override
+ public void close() throws Exception {
+ this.unsubscriber.unsubscribe(this.token);
+ }
+}