Repository: incubator-reef Updated Branches: refs/heads/master 84ff5021c -> 41ff14187
[REEF-184] Allow NetworkService to deal with multiple message handlers This adds the NetworkConnectionService interface and its implementation. The interface provides a way to register multiple ConnectionFactory's, each of which is used to create connections for a message type. JIRA: [REEF-184] http://issues.apache.org/jira/browse/REEF-184 Pull Request: Closes #255 Author: Taegeon Um [email protected] GeonWoo Kim [email protected] Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/41ff1418 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/41ff1418 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/41ff1418 Branch: refs/heads/master Commit: 41ff1418781f885e4847e1c6386c40c9eeb30db8 Parents: 84ff502 Author: taegeonum <[email protected]> Authored: Fri Jun 26 20:45:09 2015 +0900 Committer: Byung-Gon Chun <[email protected]> Committed: Sat Jul 11 01:29:04 2015 +0900 ---------------------------------------------------------------------- .../org/apache/reef/io/network/Connection.java | 16 +- .../io/network/NetworkConnectionService.java | 98 +++++ .../BindNetworkConnectionServiceToTask.java | 51 +++ .../reef/io/network/impl/NSConnection.java | 14 +- .../reef/io/network/impl/NetworkConnection.java | 94 +++++ .../network/impl/NetworkConnectionFactory.java | 103 +++++ ...etworkConnectionServiceExceptionHandler.java | 42 ++ .../impl/NetworkConnectionServiceImpl.java | 239 ++++++++++++ .../NetworkConnectionServiceLinkListener.java | 52 +++ .../impl/NetworkConnectionServiceMessage.java | 115 ++++++ .../NetworkConnectionServiceMessageCodec.java | 146 +++++++ .../NetworkConnectionServiceReceiveHandler.java | 51 +++ .../network/impl/NetworkServiceParameters.java | 2 +- .../UnbindNetworkConnectionServiceFromTask.java | 50 +++ .../NetworkConnectionServiceIdFactory.java | 29 ++ .../config/NetworkConnectionServicePort.java | 26 ++ .../io/network/impl/config/package-info.java | 22 ++ .../network/NetworkConnectionServiceTest.java | 385 +++++++++++++++++++ .../services/network/NetworkServiceTest.java | 8 +- .../network/util/NetworkMessagingTest.java | 139 +++++++ .../network/util/StreamingIntegerCodec.java | 57 +++ .../network/util/StreamingStringCodec.java | 56 +++ 22 files changed, 1780 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java index 66f2840..17fc5a1 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java @@ -20,6 +20,8 @@ package org.apache.reef.io.network; import org.apache.reef.exception.evaluator.NetworkException; +import java.util.List; + /** * Connection between two end-points named by identifiers. * @@ -35,12 +37,18 @@ public interface Connection<T> extends AutoCloseable { void open() throws NetworkException; /** - * Writes an object to the connection. + * Writes a message to the connection. * - * @param obj - * @throws NetworkException + * @param message + */ + void write(T message); + + /** + * Writes a list of messages to the connection. + * + * @param messages */ - void write(T obj) throws NetworkException; + void write(List<T> messages); /** * Closes the connection. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java new file mode 100644 index 0000000..472e619 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.impl.NetworkConnectionServiceImpl; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.remote.Codec; +import org.apache.reef.wake.remote.transport.LinkListener; + +/** + * NetworkConnectionService. + * + * NetworkConnectionService is a service which is designed for communicating messages with each other. + * It creates multiple ConnectionFactories, which create multiple connections. + * + * Flow of message transfer: + * [Downstream]: connection.write(message) -> ConnectionFactory + * -> src NetworkConnectionService (encode) -> dest NetworkConnectionService. + * [Upstream]: message -> dest NetworkConnectionService (decode) -> ConnectionFactory -> EventHandler. + * + * Users can register a ConnectionFactory by registering their Codec, EventHandler and LinkListener. + * When users send messages via connections created by the ConnectionFactory, + * + * NetworkConnectionService encodes the messages according to the Codec + * registered in the ConnectionFactory and sends them to the destination NetworkConnectionService. + * + * Also, it receives the messages by decoding the messages and forwarding them + * to the corresponding EventHandler registered in the ConnectionFactory. + */ +@DefaultImplementation(NetworkConnectionServiceImpl.class) +public interface NetworkConnectionService extends AutoCloseable { + + /** + * Registers an instance of ConnectionFactory corresponding to the connectionFactoryId. + * Binds Codec, EventHandler and LinkListener to the ConnectionFactory. + * ConnectionFactory can create multiple connections between other NetworkConnectionServices. + * + * @param connectionFactoryId a connection factory id + * @param codec a codec for type <T> + * @param eventHandler an event handler for type <T> + * @param linkListener a link listener + * @throws NetworkException throws a NetworkException when multiple connectionFactoryIds exist. + */ + <T> void registerConnectionFactory(final Identifier connectionFactoryId, + final Codec<T> codec, + final EventHandler<Message<T>> eventHandler, + final LinkListener<Message<T>> linkListener) throws NetworkException; + + /** + * Unregisters a connectionFactory corresponding to the connectionFactoryId. + * @param connectionFactoryId a connection factory id + */ + void unregisterConnectionFactory(final Identifier connectionFactoryId); + + /** + * Gets an instance of ConnectionFactory which is registered by the registerConnectionFactory method. + * @param connectionFactoryId a connection factory id + */ + <T> ConnectionFactory<T> getConnectionFactory(final Identifier connectionFactoryId); + + /** + * Registers a network connection service identifier. + * This can be used for destination identifier + * @param ncsId network connection service identifier + */ + void registerId(final Identifier ncsId); + + /** + * Unregister a network connection service identifier. + * @param ncsId network connection service identifier + */ + void unregisterId(final Identifier ncsId); + + /** + * Gets a network connection service client id which is equal to the registered id. + */ + Identifier getNetworkConnectionServiceId(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java new file mode 100644 index 0000000..69aafcf --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.impl; + +import org.apache.reef.io.network.NetworkConnectionService; +import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.task.events.TaskStart; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.IdentifierFactory; + +import javax.inject.Inject; + +/** + * TaskStart event handler for registering NetworkConnectionService. + * Users have to bind this handler into ServiceConfiguration.ON_TASK_STARTED. + */ +public final class BindNetworkConnectionServiceToTask implements EventHandler<TaskStart> { + + private final NetworkConnectionService ncs; + private final IdentifierFactory idFac; + + @Inject + public BindNetworkConnectionServiceToTask( + final NetworkConnectionService ncs, + @Parameter(NetworkConnectionServiceIdFactory.class) final IdentifierFactory idFac) { + this.ncs = ncs; + this.idFac = idFac; + } + + @Override + public void onNext(final TaskStart task) { + this.ncs.registerId(this.idFac.getNewInstance(task.getId())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java index b5fa9e3..4bfd0b7 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java @@ -28,6 +28,7 @@ import org.apache.reef.wake.remote.transport.LinkListener; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -94,14 +95,19 @@ class NSConnection<T> implements Connection<T> { } /** - * Writes an object to the connection. + * Writes a message to the connection. * - * @param obj an object of type T + * @param message a message of type T * @throws a network exception */ @Override - public void write(final T obj) throws NetworkException { - this.link.write(new NSMessage<T>(this.srcId, this.destId, obj)); + public void write(final T message) { + this.link.write(new NSMessage<T>(this.srcId, this.destId, message)); + } + + @Override + public void write(List<T> messages) { + this.link.write(new NSMessage<T>(this.srcId, this.destId, messages)); } /** http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnection.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnection.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnection.java new file mode 100644 index 0000000..d2175e7 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnection.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.impl; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.Connection; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.remote.transport.Link; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +final class NetworkConnection<T> implements Connection<T> { + + private Link<NetworkConnectionServiceMessage<T>> link; + + private final Identifier destId; + private final AtomicBoolean closed; + private final NetworkConnectionFactory connFactory; + + /** + * Constructs a connection for destination identifier of NetworkConnectionService. + * @param connFactory a connection factory of this connection. + * @param destId a destination identifier of NetworkConnectionService. + */ + NetworkConnection( + final NetworkConnectionFactory connFactory, + final Identifier destId) { + this.connFactory = connFactory; + this.destId = destId; + this.closed = new AtomicBoolean(); + } + + @Override + public void open() throws NetworkException { + link = connFactory.openLink(destId); + } + + @Override + public void write(final List<T> messageList) { + final NetworkConnectionServiceMessage<T> nsMessage = new NetworkConnectionServiceMessage<>( + connFactory.getConnectionFactoryId(), + connFactory.getSrcId(), + destId, + messageList); + link.write(nsMessage); + } + + @Override + public void write(final T message) { + final List<T> messageList = new ArrayList<>(1); + messageList.add(message); + write(messageList); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + connFactory.removeConnection(this.destId); + link = null; + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Connection from") + .append(connFactory.getSrcId()) + .append(":") + .append(connFactory.getConnectionFactoryId()) + .append(" to ") + .append(destId) + .append(":") + .append(connFactory.getConnectionFactoryId()); + return sb.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java new file mode 100644 index 0000000..d49988d --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.impl; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.Connection; +import org.apache.reef.io.network.ConnectionFactory; +import org.apache.reef.io.network.Message; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.remote.Codec; +import org.apache.reef.wake.remote.transport.Link; +import org.apache.reef.wake.remote.transport.LinkListener; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * A connection factory which is created by NetworkConnectionService. + */ +final class NetworkConnectionFactory<T> implements ConnectionFactory<T> { + + private final ConcurrentMap<Identifier, Connection<T>> connectionMap; + private final String connFactoryId; + private final Codec<T> eventCodec; + private final EventHandler<Message<T>> eventHandler; + private final LinkListener<Message<T>> eventListener; + private final NetworkConnectionServiceImpl networkService; + + NetworkConnectionFactory( + final NetworkConnectionServiceImpl networkService, + final String connFactoryId, + final Codec<T> eventCodec, + final EventHandler<Message<T>> eventHandler, + final LinkListener<Message<T>> eventListener) { + this.networkService = networkService; + this.connectionMap = new ConcurrentHashMap<>(); + this.connFactoryId = connFactoryId; + this.eventCodec = eventCodec; + this.eventHandler = eventHandler; + this.eventListener = eventListener; + } + + /** + * Creates a new connection. + * @param destId a destination identifier of NetworkConnectionService. + */ + @Override + public Connection<T> newConnection(final Identifier destId) { + final Connection<T> connection = connectionMap.get(destId); + + if (connection == null) { + final Connection<T> newConnection = new NetworkConnection<>(this, destId); + connectionMap.putIfAbsent(destId, newConnection); + return connectionMap.get(destId); + } + return connection; + } + + <T> Link<NetworkConnectionServiceMessage<T>> openLink(final Identifier remoteId) throws NetworkException { + return networkService.openLink(remoteId); + } + + String getConnectionFactoryId() { + return this.connFactoryId; + } + + Identifier getSrcId() { + return this.networkService.getNetworkConnectionServiceId(); + } + + EventHandler<Message<T>> getEventHandler() { + return eventHandler; + } + + LinkListener<Message<T>> getLinkListener() { + return eventListener; + } + + public void removeConnection(final Identifier remoteId) { + connectionMap.remove(remoteId); + } + + Codec<T> getCodec() { + return eventCodec; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceExceptionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceExceptionHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceExceptionHandler.java new file mode 100644 index 0000000..2e4bd43 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceExceptionHandler.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.impl; + +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Default exception handler. + */ +public final class NetworkConnectionServiceExceptionHandler implements EventHandler<Exception> { + + private static final Logger LOG = Logger.getLogger(NetworkConnectionServiceExceptionHandler.class.getName()); + + @Inject + public NetworkConnectionServiceExceptionHandler() { + } + + @Override + public void onNext(final Exception value) { + LOG.log(Level.WARNING, "An exception occurred in transport of NetworkConnectionService: {0}", value); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java new file mode 100644 index 0000000..f5e1f83 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.impl; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.Tuple; +import org.apache.reef.io.network.ConnectionFactory; +import org.apache.reef.io.network.Message; +import org.apache.reef.io.network.NetworkConnectionService; +import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory; +import org.apache.reef.io.network.impl.config.NetworkConnectionServicePort; +import org.apache.reef.io.network.naming.NameResolver; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.wake.EStage; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; +import org.apache.reef.wake.impl.SingleThreadStage; +import org.apache.reef.wake.remote.Codec; +import org.apache.reef.wake.remote.impl.TransportEvent; +import org.apache.reef.wake.remote.transport.Link; +import org.apache.reef.wake.remote.transport.LinkListener; +import org.apache.reef.wake.remote.transport.Transport; +import org.apache.reef.wake.remote.transport.TransportFactory; + +import javax.inject.Inject; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Default Network connection service implementation. + */ +public final class NetworkConnectionServiceImpl implements NetworkConnectionService { + + private static final Logger LOG = Logger.getLogger(NetworkConnectionServiceImpl.class.getName()); + + /** + * An identifier factory registering network connection service id. + */ + private final IdentifierFactory idFactory; + /** + * A name resolver looking up nameserver. + */ + private final NameResolver nameResolver; + /** + * A messaging transport. + */ + private final Transport transport; + /** + * A map of (id of connection factory, a connection factory instance). + */ + private final ConcurrentMap<String, NetworkConnectionFactory> connFactoryMap; + /** + * A network connection service identifier. + */ + private Identifier myId; + /** + * A network connection service message codec. + */ + private final Codec<NetworkConnectionServiceMessage> nsCodec; + /** + * A network connection service link listener. + */ + private final LinkListener<NetworkConnectionServiceMessage> nsLinkListener; + /** + * A stage registering identifiers to nameServer. + */ + private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage; + /** + * A stage unregistering identifiers from nameServer. + */ + private final EStage<Identifier> nameServiceUnregisteringStage; + + @Inject + private NetworkConnectionServiceImpl( + @Parameter(NetworkConnectionServiceIdFactory.class) final IdentifierFactory idFactory, + @Parameter(NetworkConnectionServicePort.class) final int nsPort, + final TransportFactory transportFactory, + final NameResolver nameResolver) { + this.idFactory = idFactory; + this.connFactoryMap = new ConcurrentHashMap<>(); + this.nsCodec = new NetworkConnectionServiceMessageCodec(idFactory, connFactoryMap); + this.nsLinkListener = new NetworkConnectionServiceLinkListener(connFactoryMap); + final EventHandler<TransportEvent> recvHandler = + new NetworkConnectionServiceReceiveHandler(connFactoryMap, nsCodec); + this.nameResolver = nameResolver; + this.transport = transportFactory.newInstance(nsPort, recvHandler, recvHandler, + new NetworkConnectionServiceExceptionHandler()); + + this.nameServiceRegisteringStage = new SingleThreadStage<>( + "NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() { + @Override + public void onNext(final Tuple<Identifier, InetSocketAddress> tuple) { + try { + nameResolver.register(tuple.getKey(), tuple.getValue()); + LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey()); + } catch (final Exception ex) { + final String msg = "Unable to register " + tuple.getKey() + " with name service"; + LOG.log(Level.WARNING, msg, ex); + throw new RuntimeException(msg, ex); + } + } + }, 5); + + this.nameServiceUnregisteringStage = new SingleThreadStage<>( + "NameServiceRegisterer", new EventHandler<Identifier>() { + @Override + public void onNext(final Identifier id) { + try { + nameResolver.unregister(id); + LOG.log(Level.FINEST, "Unregistered {0} with nameservice", id); + } catch (final Exception ex) { + final String msg = "Unable to unregister " + id + " with name service"; + LOG.log(Level.WARNING, msg, ex); + throw new RuntimeException(msg, ex); + } + } + }, 5); + } + + @Override + public <T> void registerConnectionFactory(final Identifier connFactoryId, + final Codec<T> codec, + final EventHandler<Message<T>> eventHandler, + final LinkListener<Message<T>> linkListener) throws NetworkException { + String id = connFactoryId.toString(); + if (connFactoryMap.get(id) != null) { + throw new NetworkException("ConnectionFactory " + connFactoryId + " was already registered."); + } + final ConnectionFactory connFactory = connFactoryMap.putIfAbsent(id, + new NetworkConnectionFactory<>(this, id, codec, eventHandler, linkListener)); + + if (connFactory != null) { + throw new NetworkException("ConnectionFactory " + connFactoryId + " was already registered."); + } + } + + @Override + public void unregisterConnectionFactory(final Identifier connFactoryId) { + final String id = connFactoryId.toString(); + final ConnectionFactory connFactory = connFactoryMap.get(id); + if (connFactory != null) { + final ConnectionFactory cf = connFactoryMap.remove(id); + if (cf == null) { + LOG.log(Level.WARNING, "ConnectionFactory of {0} is null", id); + } + } else { + LOG.log(Level.WARNING, "ConnectionFactory of {0} is null", id); + } + } + + /** + * Registers a source identifier of NetworkConnectionService. + * @param ncsId + * @throws Exception + */ + @Override + public void registerId(final Identifier ncsId) { + LOG.log(Level.INFO, "Registering NetworkConnectionService " + ncsId); + this.myId = ncsId; + final Tuple<Identifier, InetSocketAddress> tuple = + new Tuple<>(ncsId, (InetSocketAddress) this.transport.getLocalAddress()); + LOG.log(Level.FINEST, "Binding {0} to NetworkConnectionService@({1})", + new Object[]{tuple.getKey(), tuple.getValue()}); + this.nameServiceRegisteringStage.onNext(tuple); + } + + /** + * Open a channel for destination identifier of NetworkConnectionService. + * @param destId + * @throws NetworkException + */ + <T> Link<NetworkConnectionServiceMessage<T>> openLink(final Identifier destId) throws NetworkException { + try { + final SocketAddress address = nameResolver.lookup(destId); + if (address == null) { + throw new NetworkException("Lookup " + destId + " is null"); + } + return transport.open(address, nsCodec, nsLinkListener); + } catch(Exception e) { + e.printStackTrace(); + throw new NetworkException(e); + } + } + + /** + * Gets a ConnectionFactory. + * @param connFactoryId the identifier of the ConnectionFactory + */ + @Override + public <T> ConnectionFactory<T> getConnectionFactory(final Identifier connFactoryId) { + final ConnectionFactory<T> connFactory = connFactoryMap.get(connFactoryId.toString()); + if (connFactory == null) { + throw new RuntimeException("Cannot find ConnectionFactory of " + connFactoryId + "."); + } + return connFactory; + } + + @Override + public void unregisterId(final Identifier ncsId) { + LOG.log(Level.FINEST, "Unbinding {0} to NetworkConnectionService@({1})", + new Object[]{ncsId, this.transport.getLocalAddress()}); + this.myId = null; + this.nameServiceUnregisteringStage.onNext(ncsId); + } + + @Override + public Identifier getNetworkConnectionServiceId() { + return this.myId; + } + + @Override + public void close() throws Exception { + LOG.log(Level.FINE, "Shutting down"); + this.nameServiceRegisteringStage.close(); + this.nameServiceUnregisteringStage.close(); + this.transport.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceLinkListener.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceLinkListener.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceLinkListener.java new file mode 100644 index 0000000..2c9528c --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceLinkListener.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.impl; + +import org.apache.reef.wake.remote.transport.LinkListener; + +import java.net.SocketAddress; +import java.util.Map; + +final class NetworkConnectionServiceLinkListener implements LinkListener<NetworkConnectionServiceMessage> { + + private final Map<String, NetworkConnectionFactory> connFactoryMap; + + NetworkConnectionServiceLinkListener( + final Map<String, NetworkConnectionFactory> connFactoryMap) { + this.connFactoryMap = connFactoryMap; + } + + @Override + public void onSuccess(final NetworkConnectionServiceMessage message) { + final LinkListener listener = connFactoryMap.get(message.getConnectionFactoryId()).getLinkListener(); + if (listener != null) { + listener.onSuccess(message); + } + + } + + @Override + public void onException(final Throwable cause, final SocketAddress remoteAddress, + final NetworkConnectionServiceMessage message) { + final LinkListener listener = connFactoryMap.get(message.getConnectionFactoryId()).getLinkListener(); + if (listener != null) { + listener.onException(cause, remoteAddress, message); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java new file mode 100644 index 0000000..9ce10c1 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.impl; + +import org.apache.reef.io.network.Message; +import org.apache.reef.wake.Identifier; + +import java.net.SocketAddress; +import java.util.List; + + +/** + * NetworkConnectionServiceMessage implementation. + * This is a wrapper message of message type <T>. + */ +final class NetworkConnectionServiceMessage<T> implements Message<T> { + + private final List<T> messages; + private SocketAddress remoteAddr; + private final String connFactoryId; + private final Identifier srcId; + private final Identifier destId; + + /** + * Constructs a network connection service message. + * + * @param connFactoryId the connection factory identifier + * @param srcId the source identifier of NetworkConnectionService + * @param destId the destination identifier of NetworkConnectionService + * @param messages the list of messages + */ + public NetworkConnectionServiceMessage( + final String connFactoryId, + final Identifier srcId, + final Identifier destId, + final List<T> messages) { + this.connFactoryId = connFactoryId; + this.srcId = srcId; + this.destId = destId; + this.messages = messages; + } + + void setRemoteAddress(final SocketAddress remoteAddress) { + this.remoteAddr = remoteAddress; + } + + /** + * Gets a destination identifier. + * + * @return a remote id + */ + @Override + public Identifier getDestId() { + return destId; + } + + /** + * Gets a connection factory identifier. + * + * @return a connection factory id + */ + public String getConnectionFactoryId() { + return connFactoryId; + } + + + /** + * Gets a source identifier of NetworkConnectionService. + * + * @return a source id + */ + @Override + public Identifier getSrcId() { + return srcId; + } + + @Override + public List<T> getData() { + return messages; + } + + /** + * Returns a string representation of this object. + * + * @return a string representation of this object + */ + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("NSMessage"); + builder.append(" remoteID="); + builder.append(destId); + builder.append(" message=[| "); + for (T message : messages) { + builder.append(message + " |"); + } + builder.append("]"); + return builder.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessageCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessageCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessageCodec.java new file mode 100644 index 0000000..bdccff2 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessageCodec.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.impl; + +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; +import org.apache.reef.wake.remote.Codec; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * DefaultNetworkMessageCodec implementation. + * This codec encodes/decodes NetworkConnectionServiceMessageImpl according to the type <T>. + */ +final class NetworkConnectionServiceMessageCodec implements Codec<NetworkConnectionServiceMessage> { + + private final IdentifierFactory factory; + /** + * Contains entries of (id of connection factory, instance of connection factory). + */ + private final Map<String, NetworkConnectionFactory> connFactoryMap; + /** + * Contains entries of (instance of codec, boolean whether the codec is streaming or not). + */ + private final ConcurrentMap<Codec, Boolean> isStreamingCodecMap; + + /** + * Constructs a network connection service message codec. + */ + NetworkConnectionServiceMessageCodec( + final IdentifierFactory factory, + final Map<String, NetworkConnectionFactory> connFactoryMap) { + this.factory = factory; + this.connFactoryMap = connFactoryMap; + this.isStreamingCodecMap = new ConcurrentHashMap<>(); + } + + /** + * Encodes a network connection service message to bytes. + * @param obj a message + * @return bytes + */ + @Override + public byte[] encode(final NetworkConnectionServiceMessage obj) { + final Codec codec = connFactoryMap.get(obj.getConnectionFactoryId()).getCodec(); + Boolean isStreamingCodec = isStreamingCodecMap.get(codec); + if (isStreamingCodec == null) { + isStreamingCodec = codec instanceof StreamingCodec; + isStreamingCodecMap.putIfAbsent(codec, isStreamingCodec); + } + + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + try (final DataOutputStream daos = new DataOutputStream(baos)) { + daos.writeUTF(obj.getConnectionFactoryId()); + daos.writeUTF(obj.getSrcId().toString()); + daos.writeUTF(obj.getDestId().toString()); + daos.writeInt(obj.getData().size()); + + if (isStreamingCodec) { + for (final Object rec : obj.getData()) { + ((StreamingCodec) codec).encodeToStream(rec, daos); + } + } else { + final Iterable dataList = obj.getData(); + for (final Object message : dataList) { + final byte[] bytes = codec.encode(message); + daos.writeInt(bytes.length); + daos.write(bytes); + } + } + return baos.toByteArray(); + } + } catch (final IOException e) { + throw new RuntimeException("IOException", e); + } + } + + /** + * Decodes a network connection service message from bytes. + * + * @param data bytes + * @return a message + */ + @Override + public NetworkConnectionServiceMessage decode(final byte[] data) { + try (final ByteArrayInputStream bais = new ByteArrayInputStream(data)) { + try (final DataInputStream dais = new DataInputStream(bais)) { + final String connFactoryId = dais.readUTF(); + final Identifier srcId = factory.getNewInstance(dais.readUTF()); + final Identifier destId = factory.getNewInstance(dais.readUTF()); + final int size = dais.readInt(); + final List list = new ArrayList(size); + final Codec codec = connFactoryMap.get(connFactoryId).getCodec(); + Boolean isStreamingCodec = isStreamingCodecMap.get(codec); + if (isStreamingCodec == null) { + isStreamingCodec = codec instanceof StreamingCodec; + isStreamingCodecMap.putIfAbsent(codec, isStreamingCodec); + } + + if (isStreamingCodec) { + for (int i = 0; i < size; i++) { + list.add(((StreamingCodec) codec).decodeFromStream(dais)); + } + } else { + for (int i = 0; i < size; i++) { + final int byteSize = dais.readInt(); + final byte[] bytes = new byte[byteSize]; + dais.read(bytes); + list.add(codec.decode(bytes)); + } + } + + return new NetworkConnectionServiceMessage( + connFactoryId, + srcId, + destId, + list + ); + } + } catch (final IOException e) { + throw new RuntimeException("IOException", e); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceReceiveHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceReceiveHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceReceiveHandler.java new file mode 100644 index 0000000..1875e79 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceReceiveHandler.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.impl; + +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.remote.Codec; +import org.apache.reef.wake.remote.impl.TransportEvent; + +import java.util.Map; + +/** + * NetworkConnectionService event handler. + * It dispatches events to the corresponding eventHandler. + */ +final class NetworkConnectionServiceReceiveHandler implements EventHandler<TransportEvent> { + + private final Map<String, NetworkConnectionFactory> connFactoryMap; + private final Codec<NetworkConnectionServiceMessage> codec; + + NetworkConnectionServiceReceiveHandler( + final Map<String, NetworkConnectionFactory> connFactoryMap, + final Codec<NetworkConnectionServiceMessage> codec) { + this.connFactoryMap = connFactoryMap; + this.codec = codec; + } + + @Override + public void onNext(final TransportEvent transportEvent) { + final NetworkConnectionServiceMessage nsMessage = codec.decode(transportEvent.getData()); + nsMessage.setRemoteAddress(transportEvent.getRemoteAddress()); + final NetworkConnectionFactory connFactory = connFactoryMap.get(nsMessage.getConnectionFactoryId()); + final EventHandler eventHandler = connFactory.getEventHandler(); + eventHandler.onNext(nsMessage); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java index c79f88a..a2f5f2a 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java @@ -39,7 +39,7 @@ public class NetworkServiceParameters { public static class NetworkServiceIdentifierFactory implements Name<IdentifierFactory> { } - @NamedParameter(doc = "port for the network service", short_name = "nsport", default_value = "7070") + @NamedParameter(doc = "port for the network service", short_name = "nsport", default_value = "0") public static class NetworkServicePort implements Name<Integer> { } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java new file mode 100644 index 0000000..540fe80 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.impl; + +import org.apache.reef.io.network.NetworkConnectionService; +import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.task.events.TaskStop; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.IdentifierFactory; + +import javax.inject.Inject; +/** + * TaskStop event handler for unregistering NetworkConnectionService. + * Users have to bind this handler into ServiceConfiguration.ON_TASK_STOP. + */ +public final class UnbindNetworkConnectionServiceFromTask implements EventHandler<TaskStop> { + + private final NetworkConnectionService ncs; + private final IdentifierFactory idFac; + + @Inject + public UnbindNetworkConnectionServiceFromTask( + final NetworkConnectionService ncs, + @Parameter(NetworkConnectionServiceIdFactory.class) final IdentifierFactory idFac) { + this.ncs = ncs; + this.idFac = idFac; + } + + @Override + public void onNext(final TaskStop task) { + this.ncs.unregisterId(this.idFac.getNewInstance(task.getId())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServiceIdFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServiceIdFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServiceIdFactory.java new file mode 100644 index 0000000..67c3e96 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServiceIdFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.impl.config; + +import org.apache.reef.io.network.util.StringIdentifierFactory; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.wake.IdentifierFactory; + +@NamedParameter(doc = "identifier factory for the service", short_name = "ncsfactory", + default_class = StringIdentifierFactory.class) +public final class NetworkConnectionServiceIdFactory implements Name<IdentifierFactory> { +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServicePort.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServicePort.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServicePort.java new file mode 100644 index 0000000..abaf840 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/NetworkConnectionServicePort.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.impl.config; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +@NamedParameter(doc = "port for the network connection service", short_name = "ncsport", default_value = "0") +public final class NetworkConnectionServicePort implements Name<Integer> { +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/package-info.java new file mode 100644 index 0000000..e54151a --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/config/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * TODO: Document. + */ +package org.apache.reef.io.network.impl.config; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java new file mode 100644 index 0000000..ddf4bfd --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.services.network; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.Connection; +import org.apache.reef.io.network.util.StringCodec; +import org.apache.reef.io.network.util.StringIdentifierFactory; +import org.apache.reef.services.network.util.*; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; +import org.apache.reef.wake.remote.Codec; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; +import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.util.concurrent.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Default Network connection service test. + */ +public class NetworkConnectionServiceTest { + private static final Logger LOG = Logger.getLogger(NetworkConnectionServiceTest.class.getName()); + + private final LocalAddressProvider localAddressProvider; + private final String localAddress; + private final Identifier groupCommClientId; + private final Identifier shuffleClientId; + + public NetworkConnectionServiceTest() throws InjectionException { + localAddressProvider = LocalAddressProviderFactory.getInstance(); + localAddress = localAddressProvider.getLocalAddress(); + + final IdentifierFactory idFac = new StringIdentifierFactory(); + this.groupCommClientId = idFac.getNewInstance("groupComm"); + this.shuffleClientId = idFac.getNewInstance("shuffle"); + } + + @Rule + public TestName name = new TestName(); + + private void runMessagingNetworkConnectionService(Codec<String> codec) throws Exception { + final int numMessages = 2000; + final Monitor monitor = new Monitor(); + final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress); + messagingTest.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); + + final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId); + try { + conn.open(); + for (int count = 0; count < numMessages; ++count) { + // send messages to the receiver. + conn.write("hello" + count); + } + monitor.mwait(); + } catch (NetworkException e) { + e.printStackTrace(); + } + + conn.close(); + messagingTest.close(); + } + + /** + * NetworkConnectionService messaging test. + */ + @Test + public void testMessagingNetworkConnectionService() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + runMessagingNetworkConnectionService(new StringCodec()); + } + + /** + * NetworkConnectionService streaming messaging test. + */ + @Test + public void testStreamingMessagingNetworkConnectionService() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + runMessagingNetworkConnectionService(new StreamingStringCodec()); + } + + public void runNetworkConnServiceWithMultipleConnFactories(Codec<String> stringCodec, Codec<Integer> integerCodec) throws Exception { + final ExecutorService executor = Executors.newFixedThreadPool(5); + + final int groupcommMessages = 1000; + final Monitor monitor = new Monitor(); + final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress); + + messagingTest.registerTestConnectionFactory(groupCommClientId, groupcommMessages, monitor, stringCodec); + + final int shuffleMessges = 2000; + final Monitor monitor2 = new Monitor(); + messagingTest.registerTestConnectionFactory(shuffleClientId, shuffleMessges, monitor2, integerCodec); + + executor.submit(new Runnable() { + @Override + public void run() { + final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId); + try { + conn.open(); + for (int count = 0; count < groupcommMessages; ++count) { + // send messages to the receiver. + conn.write("hello" + count); + } + monitor.mwait(); + conn.close(); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + }); + + executor.submit(new Runnable() { + @Override + public void run() { + final Connection<Integer> conn = messagingTest.getConnectionFromSenderToReceiver(shuffleClientId); + try { + conn.open(); + for (int count = 0; count < shuffleMessges; ++count) { + // send messages to the receiver. + conn.write(count); + } + monitor2.mwait(); + conn.close(); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + }); + + monitor.mwait(); + monitor2.mwait(); + executor.shutdown(); + messagingTest.close(); + } + + /** + * Test NetworkService registering multiple connection factories. + */ + @Test + public void testMultipleConnectionFactoriesTest() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + runNetworkConnServiceWithMultipleConnFactories(new StringCodec(), new ObjectSerializableCodec<Integer>()); + } + + /** + * Test NetworkService registering multiple connection factories with Streamingcodec. + */ + @Test + public void testMultipleConnectionFactoriesStreamingTest() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + runNetworkConnServiceWithMultipleConnFactories(new StreamingStringCodec(), new StreamingIntegerCodec()); + } + + /** + * NetworkService messaging rate benchmark. + */ + @Test + public void testMessagingNetworkConnServiceRate() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024}; + + for (int size : messageSizes) { + final int numMessages = 300000 / (Math.max(1, size / 512)); + final Monitor monitor = new Monitor(); + final Codec<String> codec = new StringCodec(); + final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress); + messagingTest.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); + final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId); + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + + long start = System.currentTimeMillis(); + try { + conn.open(); + for (int count = 0; count < numMessages; ++count) { + // send messages to the receiver. + conn.write(message); + } + monitor.mwait(); + } catch (NetworkException e) { + e.printStackTrace(); + } + final long end = System.currentTimeMillis(); + + final double runtime = ((double) end - start) / 1000; + LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numMessages / runtime + " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / runtime);// x2 for unicode chars + messagingTest.close(); + } + } + + /** + * NetworkService messaging rate benchmark. + */ + @Test + public void testMessagingNetworkConnServiceRateDisjoint() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + final BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>(); + + final int numThreads = 4; + final int size = 2000; + final int numMessages = 300000 / (Math.max(1, size / 512)); + final int totalNumMessages = numMessages * numThreads; + + final ExecutorService e = Executors.newCachedThreadPool(); + for (int t = 0; t < numThreads; t++) { + final int tt = t; + + e.submit(new Runnable() { + public void run() { + try { + final Monitor monitor = new Monitor(); + final Codec<String> codec = new StringCodec(); + final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress); + messagingTest.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); + final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId); + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + + try { + conn.open(); + for (int count = 0; count < numMessages; ++count) { + // send messages to the receiver. + conn.write(message); + } + monitor.mwait(); + } catch (NetworkException e) { + e.printStackTrace(); + } + messagingTest.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + } + + // start and time + final long start = System.currentTimeMillis(); + final Object ignore = new Object(); + for (int i = 0; i < numThreads; i++) barrier.add(ignore); + e.shutdown(); + e.awaitTermination(100, TimeUnit.SECONDS); + final long end = System.currentTimeMillis(); + final double runtime = ((double) end - start) / 1000; + LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime);// x2 for unicode chars + } + + @Test + public void testMultithreadedSharedConnMessagingNetworkConnServiceRate() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + final int[] messageSizes = {2000};// {1,16,32,64,512,64*1024,1024*1024}; + + for (int size : messageSizes) { + final int numMessages = 300000 / (Math.max(1, size / 512)); + final int numThreads = 2; + final int totalNumMessages = numMessages * numThreads; + final Monitor monitor = new Monitor(); + final Codec<String> codec = new StringCodec(); + final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress); + messagingTest.registerTestConnectionFactory(groupCommClientId, totalNumMessages, monitor, codec); + + final ExecutorService e = Executors.newCachedThreadPool(); + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId); + + final long start = System.currentTimeMillis(); + for (int i = 0; i < numThreads; i++) { + e.submit(new Runnable() { + @Override + public void run() { + + try { + conn.open(); + for (int count = 0; count < numMessages; ++count) { + // send messages to the receiver. + conn.write(message); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + } + + e.shutdown(); + e.awaitTermination(30, TimeUnit.SECONDS); + monitor.mwait(); + final long end = System.currentTimeMillis(); + final double runtime = ((double) end - start) / 1000; + LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime);// x2 for unicode chars + conn.close(); + messagingTest.close(); + } + } + + /** + * NetworkService messaging rate benchmark. + */ + @Test + public void testMessagingNetworkConnServiceBatchingRate() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + + final int batchSize = 1024 * 1024; + final int[] messageSizes = {32, 64, 512}; + + for (int size : messageSizes) { + final int numMessages = 300 / (Math.max(1, size / 512)); + final Monitor monitor = new Monitor(); + final Codec<String> codec = new StringCodec(); + final NetworkMessagingTest messagingTest = new NetworkMessagingTest(localAddress); + messagingTest.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); + final Connection<String> conn = messagingTest.getConnectionFromSenderToReceiver(groupCommClientId); + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + + final long start = System.currentTimeMillis(); + try { + for (int i = 0; i < numMessages; i++) { + final StringBuilder sb = new StringBuilder(); + for (int j = 0; j < batchSize / size; j++) { + sb.append(message); + } + conn.open(); + conn.write(sb.toString()); + } + monitor.mwait(); + } catch (NetworkException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + + final long end = System.currentTimeMillis(); + final double runtime = ((double) end - start) / 1000; + final long numAppMessages = numMessages * batchSize / size; + LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numAppMessages / runtime + " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / runtime);// x2 for unicode chars + messagingTest.close(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java index c1f76b8..2bb75ae 100644 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java +++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java @@ -387,12 +387,8 @@ public class NetworkServiceTest { @Override public void run() { - try { - for (int i = 0; i < numMessages; i++) { - conn.write(message); - } - } catch (NetworkException e) { - e.printStackTrace(); + for (int i = 0; i < numMessages; i++) { + conn.write(message); } } }); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java new file mode 100644 index 0000000..ba0cfe4 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.services.network.util; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.Connection; +import org.apache.reef.io.network.Message; +import org.apache.reef.io.network.NetworkConnectionService; +import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory; +import org.apache.reef.io.network.naming.NameResolverConfiguration; +import org.apache.reef.io.network.naming.NameServer; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; +import org.apache.reef.wake.remote.Codec; +import org.apache.reef.wake.remote.transport.LinkListener; + +import java.net.SocketAddress; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Helper class for NetworkConnectionService test. + */ +public final class NetworkMessagingTest { + private static final Logger LOG = Logger.getLogger(NetworkMessagingTest.class.getName()); + + private final IdentifierFactory factory; + private final NetworkConnectionService receiverNetworkConnService; + private final NetworkConnectionService senderNetworkConnService; + private final String receiver; + private final String sender; + private final NameServer nameServer; + + public NetworkMessagingTest(final String localAddress) throws InjectionException { + // name server + final Injector injector = Tang.Factory.getTang().newInjector(); + this.nameServer = injector.getInstance(NameServer.class); + final Configuration netConf = NameResolverConfiguration.CONF + .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress) + .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServer.getPort()) + .build(); + + LOG.log(Level.FINEST, "=== Test network connection service receiver start"); + // network service for receiver + this.receiver = "receiver"; + final Injector injectorReceiver = injector.forkInjector(netConf); + this.receiverNetworkConnService = injectorReceiver.getInstance(NetworkConnectionService.class); + this.factory = injectorReceiver.getNamedInstance(NetworkConnectionServiceIdFactory.class); + this.receiverNetworkConnService.registerId(this.factory.getNewInstance(receiver)); + + // network service for sender + this.sender = "sender"; + LOG.log(Level.FINEST, "=== Test network connection service sender start"); + final Injector injectorSender = injector.forkInjector(netConf); + senderNetworkConnService = injectorSender.getInstance(NetworkConnectionService.class); + senderNetworkConnService.registerId(this.factory.getNewInstance(this.sender)); + } + + public <T> void registerTestConnectionFactory(final Identifier connFactoryId, + final int numMessages, final Monitor monitor, + final Codec<T> codec) throws NetworkException { + receiverNetworkConnService.registerConnectionFactory(connFactoryId, codec, new MessageHandler<T>(monitor, numMessages), new TestListener<T>()); + senderNetworkConnService.registerConnectionFactory(connFactoryId, codec, new MessageHandler<T>(monitor, numMessages), new TestListener<T>()); + } + + public <T> Connection<T> getConnectionFromSenderToReceiver(final Identifier connFactoryId) { + final Identifier destId = factory.getNewInstance(receiver); + return (Connection<T>)senderNetworkConnService.getConnectionFactory(connFactoryId).newConnection(destId); + } + + public void close() throws Exception { + senderNetworkConnService.close(); + receiverNetworkConnService.close(); + nameServer.close(); + } + + public static final class MessageHandler<T> implements EventHandler<Message<T>> { + private final int expected; + private final Monitor monitor; + private AtomicInteger count = new AtomicInteger(0); + + public MessageHandler(final Monitor monitor, + final int expected) { + this.monitor = monitor; + this.expected = expected; + } + + @Override + public void onNext(Message<T> value) { + count.incrementAndGet(); + LOG.log(Level.FINE, "Count: {0}", count.get()); + LOG.log(Level.FINE, + "OUT: {0} received {1} from {2} to {3}", + new Object[]{value, value.getSrcId(), value.getDestId()}); + + for (final T obj : value.getData()) { + LOG.log(Level.FINE, "OUT: data: {0}", obj); + } + + if (count.get() == expected) { + monitor.mnotify(); + } + } + } + + public static final class TestListener<T> implements LinkListener<Message<T>> { + @Override + public void onSuccess(Message<T> message) { + LOG.log(Level.FINE, "success: " + message); + } + @Override + public void onException(Throwable cause, SocketAddress remoteAddress, Message<T> message) { + LOG.log(Level.WARNING, "exception: " + cause + message); + throw new RuntimeException(cause); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java new file mode 100644 index 0000000..eaec90f --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.services.network.util; + +import org.apache.reef.io.network.impl.StreamingCodec; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + + +public class StreamingIntegerCodec implements StreamingCodec<Integer> { + + @Override + public void encodeToStream(Integer obj, DataOutputStream stream) { + try { + stream.writeInt(obj); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Integer decodeFromStream(DataInputStream stream) { + try { + return stream.readInt(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Integer decode(byte[] data) { + return null; + } + + @Override + public byte[] encode(Integer obj) { + return new byte[0]; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/41ff1418/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java new file mode 100644 index 0000000..497c7fa --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.services.network.util; + +import org.apache.reef.io.network.impl.StreamingCodec; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + + +public class StreamingStringCodec implements StreamingCodec<String> { + @Override + public byte[] encode(String obj) { + return obj.getBytes(); + } + + @Override + public String decode(byte[] buf) { + return new String(buf); + } + + @Override + public void encodeToStream(String obj, DataOutputStream stream) { + try { + stream.writeUTF(obj); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public String decodeFromStream(DataInputStream stream) { + try { + return stream.readUTF(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +}
