[REEF-126]: Merge Enhanced Features into Shimoga
This addressed the issue by
* adding Gather and Scatter operations and the corresponding configurations
* modifying the OperatorTopology classes to support Gather and Scatter
* changing the TopologyUpdate message so that
the Driver sends the current topology info for Scatter
JIRA: [REEF-126](https://issues.apache.org/jira/browse/REEF-126)
Pull request:
This closes #226
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/05792696
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/05792696
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/05792696
Branch: refs/heads/master
Commit: 05792696abfe2ad4e5913fbd0af3e1ad78c0d1d6
Parents: 8641366
Author: Jason (Joo Seong) Jeong <[email protected]>
Authored: Tue Jun 16 20:48:37 2015 +0900
Committer: Brian Cho <[email protected]>
Committed: Sun Aug 2 18:14:29 2015 +0900
----------------------------------------------------------------------
lang/java/reef-io/pom.xml | 4 +
.../api/driver/CommunicationGroupDriver.java | 20 ++
.../io/network/group/api/driver/TaskNode.java | 2 +
.../io/network/group/api/operators/Gather.java | 5 +
.../io/network/group/api/operators/Scatter.java | 5 +
.../api/task/CommunicationGroupClient.java | 52 +++++
.../group/api/task/OperatorTopology.java | 9 +-
.../group/api/task/OperatorTopologyStruct.java | 7 +-
.../group/impl/config/GatherOperatorSpec.java | 82 ++++++++
.../group/impl/config/ScatterOperatorSpec.java | 83 ++++++++
.../driver/CommunicationGroupDriverImpl.java | 38 ++++
.../network/group/impl/driver/FlatTopology.java | 2 +-
.../network/group/impl/driver/TaskNodeImpl.java | 7 +
.../group/impl/driver/TopologySerializer.java | 104 ++++++++++
.../group/impl/driver/TopologySimpleNode.java | 44 ++++
.../impl/driver/TopologyUpdateWaitHandler.java | 6 +-
.../network/group/impl/driver/TreeTopology.java | 23 ++-
.../group/impl/operators/BroadcastReceiver.java | 2 +-
.../group/impl/operators/GatherReceiver.java | 192 ++++++++++++++++++
.../group/impl/operators/GatherSender.java | 156 ++++++++++++++
.../group/impl/operators/ScatterReceiver.java | 164 +++++++++++++++
.../group/impl/operators/ScatterSender.java | 203 +++++++++++++++++++
.../impl/task/CommunicationGroupClientImpl.java | 114 ++++++++++-
.../group/impl/task/OperatorTopologyImpl.java | 28 ++-
.../impl/task/OperatorTopologyStructImpl.java | 147 ++++++++++----
.../network/group/impl/utils/ScatterData.java | 56 +++++
.../group/impl/utils/ScatterDecoder.java | 66 ++++++
.../group/impl/utils/ScatterEncoder.java | 147 ++++++++++++++
.../network/group/impl/utils/ScatterHelper.java | 59 ++++++
.../network/group/TopologySerializerTest.java | 72 +++++++
.../group/impl/utils/ScatterCodecTest.java | 93 +++++++++
.../group/impl/utils/ScatterHelperTest.java | 56 +++++
.../network/group/impl/utils/package-info.java | 22 ++
33 files changed, 2017 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/pom.xml b/lang/java/reef-io/pom.xml
index 6732b67..279cedf 100644
--- a/lang/java/reef-io/pom.xml
+++ b/lang/java/reef-io/pom.xml
@@ -143,6 +143,10 @@ under the License.
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ </dependency>
<!-- END OF HADOOP -->
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java
index 3c5bfa2..9d65355 100644
---
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java
@@ -20,7 +20,9 @@ package org.apache.reef.io.network.group.api.driver;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.GatherOperatorSpec;
import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.ScatterOperatorSpec;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.annotations.Name;
@@ -56,6 +58,24 @@ public interface CommunicationGroupDriver {
CommunicationGroupDriver addReduce(Class<? extends Name<String>>
operatorName, ReduceOperatorSpec spec);
/**
+ * Add the scatter operator specified by {@code operatorName} and {@code
spec}.
+ *
+ * @param operatorName
+ * @param spec
+ * @return
+ */
+ CommunicationGroupDriver addScatter(Class<? extends Name<String>>
operatorName, ScatterOperatorSpec spec);
+
+ /**
+ * Add the gather operator specified by {@code operatorName} and {@code
spec}.
+ *
+ * @param operatorName
+ * @param spec
+ * @return
+ */
+ CommunicationGroupDriver addGather(Class<? extends Name<String>>
operatorName, GatherOperatorSpec spec);
+
+ /**
* This signals to the service that no more.
* operator specs will be added to this communication
* group and an attempt to do that will throw an
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java
index 8a626cf..e2bb1fd 100644
---
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java
@@ -38,6 +38,8 @@ public interface TaskNode {
void setParent(TaskNode parent);
+ Iterable<TaskNode> getChildren();
+
void addChild(TaskNode child);
void removeChild(TaskNode taskNode);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java
index dc356d7..bd55629 100644
---
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java
@@ -19,6 +19,9 @@
package org.apache.reef.io.network.group.api.operators;
import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.group.impl.operators.GatherReceiver;
+import org.apache.reef.io.network.group.impl.operators.GatherSender;
+import org.apache.reef.tang.annotations.DefaultImplementation;
import org.apache.reef.wake.Identifier;
import java.util.List;
@@ -34,6 +37,7 @@ public interface Gather {
/**
* Senders or non-roots.
*/
+ @DefaultImplementation(GatherSender.class)
interface Sender<T> extends GroupCommOperator {
/**
@@ -45,6 +49,7 @@ public interface Gather {
/**
* Receiver or Root.
*/
+ @DefaultImplementation(GatherReceiver.class)
interface Receiver<T> extends GroupCommOperator {
/**
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java
index 5dba002..5ded0a0 100644
---
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java
@@ -19,6 +19,9 @@
package org.apache.reef.io.network.group.api.operators;
import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.group.impl.operators.ScatterReceiver;
+import org.apache.reef.io.network.group.impl.operators.ScatterSender;
+import org.apache.reef.tang.annotations.DefaultImplementation;
import org.apache.reef.wake.Identifier;
import java.util.List;
@@ -35,6 +38,7 @@ public interface Scatter {
/**
* Sender or Root.
*/
+ @DefaultImplementation(ScatterSender.class)
interface Sender<T> extends GroupCommOperator {
/**
@@ -63,6 +67,7 @@ public interface Scatter {
/**
* Receiver or non-roots.
*/
+ @DefaultImplementation(ScatterReceiver.class)
interface Receiver<T> extends GroupCommOperator {
/**
* Receive the sub-list of elements targeted for the current receiver.
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java
index 22314ad..4aa5217 100644
---
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java
@@ -20,11 +20,17 @@ package org.apache.reef.io.network.group.api.task;
import org.apache.reef.annotations.audience.TaskSide;
import org.apache.reef.io.network.group.api.operators.Broadcast;
+import org.apache.reef.io.network.group.api.operators.Gather;
import org.apache.reef.io.network.group.api.operators.Reduce;
import org.apache.reef.io.network.group.api.GroupChanges;
+import org.apache.reef.io.network.group.api.operators.Scatter;
+import org.apache.reef.io.network.group.impl.driver.TopologySimpleNode;
import org.apache.reef.io.network.group.impl.task.CommunicationGroupClientImpl;
import org.apache.reef.tang.annotations.DefaultImplementation;
import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.wake.Identifier;
+
+import java.util.List;
/**
* The Task side interface of a communication group.
@@ -78,12 +84,58 @@ public interface CommunicationGroupClient {
Reduce.Sender getReduceSender(Class<? extends Name<String>> operatorName);
/**
+ * Return the scatter sender configured on this communication group.
+ * {@code operatorName} is used to specify the scatter sender to return.
+ *
+ * @param operatorName
+ * @return
+ */
+ Scatter.Sender getScatterSender(Class<? extends Name<String>> operatorName);
+
+ /**
+ * Return the scatter receiver configured on this communication group.
+ * {@code operatorName} is used to specify the scatter receiver to return.
+ *
+ * @param operatorName
+ * @return
+ */
+ Scatter.Receiver getScatterReceiver(Class<? extends Name<String>>
operatorName);
+
+ /**
+ * Return the gather receiver configured on this communication group.
+ * {@code operatorName} is used to specify the gather receiver to return.
+ *
+ * @param operatorName
+ * @return
+ */
+ Gather.Receiver getGatherReceiver(Class<? extends Name<String>>
operatorName);
+
+ /**
+ * Return the gather sender configured on this communication group.
+ * {@code operatorName} is used to specify the gather sender to return.
+ *
+ * @param operatorName
+ * @return
+ */
+ Gather.Sender getGatherSender(Class<? extends Name<String>> operatorName);
+
+ /**
* @return Changes in topology of this communication group since the last
time
* this method was called
*/
GroupChanges getTopologyChanges();
/**
+ * @return list of current active tasks, last updated during updateTopology()
+ */
+ List<Identifier> getActiveSlaveTasks();
+
+ /**
+ * @return root node of simplified topology representation
+ */
+ TopologySimpleNode getTopologySimpleNodeRoot();
+
+ /**
* Asks the driver to update the topology of this communication group. This
can
* be an expensive call depending on what the minimum number of tasks is for
this
* group to function as this first tells the driver, driver then tells the
affected
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java
index 0b55259..7bc181b 100644
---
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java
@@ -24,6 +24,8 @@ import
org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
import org.apache.reef.io.serialization.Codec;
+import java.util.Map;
+
/**
* Represents the local topology of tasks for an operator. It
* provides methods to send/rcv from parents & children
@@ -48,11 +50,16 @@ public interface OperatorTopology {
void sendToParent(byte[] encode,
ReefNetworkGroupCommProtos.GroupCommMessage.Type reduce) throws
ParentDeadException;
- byte[] recvFromParent() throws ParentDeadException;
+ byte[] recvFromParent(ReefNetworkGroupCommProtos.GroupCommMessage.Type
msgType) throws ParentDeadException;
void sendToChildren(byte[] data,
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) throws
ParentDeadException;
+ void sendToChildren(Map<String, byte[]> dataMap,
+ ReefNetworkGroupCommProtos.GroupCommMessage.Type
msgType) throws ParentDeadException;
+
<T> T recvFromChildren(ReduceFunction<T> redFunc, Codec<T> dataCodec) throws
ParentDeadException;
+ byte[] recvFromChildren() throws ParentDeadException;
+
void initialize() throws ParentDeadException;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java
index 969134d..4d659a3 100644
---
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java
@@ -26,6 +26,7 @@ import org.apache.reef.io.serialization.Codec;
import org.apache.reef.tang.annotations.Name;
import java.util.Collection;
+import java.util.Map;
import java.util.Set;
/**
@@ -65,9 +66,13 @@ public interface OperatorTopologyStruct {
void sendToParent(byte[] data,
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType);
- byte[] recvFromParent();
+ byte[] recvFromParent(ReefNetworkGroupCommProtos.GroupCommMessage.Type
msgType);
void sendToChildren(byte[] data,
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType);
+ void sendToChildren(Map<String, byte[]> dataMap,
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType);
+
<T> T recvFromChildren(ReduceFunction<T> redFunc, Codec<T> dataCodec);
+
+ byte[] recvFromChildren();
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/GatherOperatorSpec.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/GatherOperatorSpec.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/GatherOperatorSpec.java
new file mode 100644
index 0000000..8eb2a90
--- /dev/null
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/GatherOperatorSpec.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config;
+
+import org.apache.reef.io.network.group.api.config.OperatorSpec;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.serialization.Codec;
+
+/**
+ * The specification for the Gather operator.
+ */
+public class GatherOperatorSpec implements OperatorSpec {
+
+ private final String receiverId;
+ private final Class<? extends Codec> dataCodecClass;
+
+ public GatherOperatorSpec(final String receiverId,
+ final Class<? extends Codec> dataCodecClass) {
+ this.receiverId = receiverId;
+ this.dataCodecClass = dataCodecClass;
+ }
+
+ public String getReceiverId() {
+ return receiverId;
+ }
+
+ @Override
+ public Class<? extends Codec> getDataCodecClass() {
+ return dataCodecClass;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("Gather Operator Spec:
[receiver=")
+ .append(receiverId)
+ .append("] [dataCodecClass=")
+ .append(Utils.simpleName(dataCodecClass))
+ .append("]");
+ return sb.toString();
+ }
+
+ public static Builder newBuilder() {
+ return new GatherOperatorSpec.Builder();
+ }
+
+ public static class Builder implements
org.apache.reef.util.Builder<GatherOperatorSpec> {
+
+ private String receiverId;
+ private Class<? extends Codec> dataCodecClass;
+
+ public Builder setReceiverId(final String receiverId) {
+ this.receiverId = receiverId;
+ return this;
+ }
+
+ public Builder setDataCodecClass(final Class<? extends Codec>
dataCodecClass) {
+ this.dataCodecClass = dataCodecClass;
+ return this;
+ }
+
+ @Override
+ public GatherOperatorSpec build() {
+ return new GatherOperatorSpec(receiverId, dataCodecClass);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ScatterOperatorSpec.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ScatterOperatorSpec.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ScatterOperatorSpec.java
new file mode 100644
index 0000000..fadff14
--- /dev/null
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ScatterOperatorSpec.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config;
+
+import org.apache.reef.io.network.group.api.config.OperatorSpec;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.serialization.Codec;
+
+/**
+ * The specification for the Scatter operator.
+ */
+public class ScatterOperatorSpec implements OperatorSpec {
+
+ private final String senderId;
+ private final Class<? extends Codec> dataCodecClass;
+
+ public ScatterOperatorSpec(final String senderId,
+ final Class<? extends Codec> dataCodecClass) {
+ this.senderId = senderId;
+ this.dataCodecClass = dataCodecClass;
+ }
+
+ public String getSenderId() {
+ return senderId;
+ }
+
+ @Override
+ public Class<? extends Codec> getDataCodecClass() {
+ return dataCodecClass;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("Scatter Operator Spec:
[sender=")
+ .append(senderId)
+ .append("] [dataCodecClass=")
+ .append(Utils.simpleName(dataCodecClass))
+ .append("]");
+ return sb.toString();
+ }
+
+ public static Builder newBuilder() {
+ return new ScatterOperatorSpec.Builder();
+ }
+
+ public static class Builder implements
org.apache.reef.util.Builder<ScatterOperatorSpec> {
+
+ private String senderId;
+ private Class<? extends Codec> dataCodecClass;
+
+ public Builder setSenderId(final String senderId) {
+ this.senderId = senderId;
+ return this;
+ }
+
+ public Builder setDataCodecClass(final Class<? extends Codec>
dataCodecClass) {
+ this.dataCodecClass = dataCodecClass;
+ return this;
+ }
+
+ @Override
+ public ScatterOperatorSpec build() {
+ return new ScatterOperatorSpec(senderId, dataCodecClass);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
index ffc0f20..657aec8 100644
---
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
@@ -30,7 +30,9 @@ import
org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
import org.apache.reef.io.network.group.api.driver.Topology;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.GatherOperatorSpec;
import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.ScatterOperatorSpec;
import
org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
import
org.apache.reef.io.network.group.impl.config.parameters.SerializedOperConfigs;
@@ -148,6 +150,42 @@ public class CommunicationGroupDriverImpl implements
CommunicationGroupDriver {
}
@Override
+ public CommunicationGroupDriver addScatter(final Class<? extends
Name<String>> operatorName,
+ final ScatterOperatorSpec spec) {
+ LOG.entering("CommunicationGroupDriverImpl", "addScatter",
+ new Object[]{getQualifiedName(), Utils.simpleName(operatorName),
spec});
+ if (finalised) {
+ throw new IllegalStateException("Can't add more operators to a finalised
spec");
+ }
+ operatorSpecs.put(operatorName, spec);
+ final Topology topology = new TreeTopology(senderStage, groupName,
operatorName, driverId, numberOfTasks, fanOut);
+ topology.setRootTask(spec.getSenderId());
+ topology.setOperatorSpecification(spec);
+ topologies.put(operatorName, topology);
+ LOG.exiting("CommunicationGroupDriverImpl", "addScatter",
+ Arrays.toString(new Object[]{getQualifiedName(),
Utils.simpleName(operatorName), spec}));
+ return this;
+ }
+
+ @Override
+ public CommunicationGroupDriver addGather(final Class<? extends
Name<String>> operatorName,
+ final GatherOperatorSpec spec) {
+ LOG.entering("CommunicationGroupDriverImpl", "addGather",
+ new Object[]{getQualifiedName(), Utils.simpleName(operatorName),
spec});
+ if (finalised) {
+ throw new IllegalStateException("Can't add more operators to a finalised
spec");
+ }
+ operatorSpecs.put(operatorName, spec);
+ final Topology topology = new TreeTopology(senderStage, groupName,
operatorName, driverId, numberOfTasks, fanOut);
+ topology.setRootTask(spec.getReceiverId());
+ topology.setOperatorSpecification(spec);
+ topologies.put(operatorName, topology);
+ LOG.exiting("CommunicationGroupDriverImpl", "addGather",
+ Arrays.toString(new Object[]{getQualifiedName(),
Utils.simpleName(operatorName), spec}));
+ return this;
+ }
+
+ @Override
public Configuration getTaskConfiguration(final Configuration taskConf) {
LOG.entering("CommunicationGroupDriverImpl", "getTaskConfiguration",
new Object[]{getQualifiedName(), confSerializer.toString(taskConf)});
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
index 46d2345..a2da654 100644
---
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
@@ -267,7 +267,7 @@ public class FlatTopology implements Topology {
final EventHandler<List<TaskNode>> topoUpdateWaitHandler = new
TopologyUpdateWaitHandler(senderStage, groupName,
operName, driverId, 0,
dstId, version,
- getQualifiedName());
+ getQualifiedName(), TopologySerializer.encode(root));
final EStage<List<TaskNode>> nodeTopologyUpdateWaitStage = new
SingleThreadStage<>("NodeTopologyUpdateWaitStage",
topoUpdateWaitHandler,
nodes.size());
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
index f2c465c..7b68682 100644
---
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
@@ -303,6 +303,13 @@ public class TaskNodeImpl implements TaskNode {
return parent;
}
+ @Override
+ public Iterable<TaskNode> getChildren() {
+ LOG.entering("TaskNodeImpl", "getChildren", getQualifiedName());
+ LOG.exiting("TaskNodeImpl", "getChildren", getQualifiedName() + children);
+ return children;
+ }
+
private String getQualifiedName() {
return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) +
":(" + taskId + "," + getVersion() + ") - ";
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologySerializer.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologySerializer.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologySerializer.java
new file mode 100644
index 0000000..f019703
--- /dev/null
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologySerializer.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.io.network.group.api.driver.TaskNode;
+import org.apache.reef.io.network.util.Pair;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+
+import java.io.*;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Utility class for encoding a Topology into a byte array and vice versa.
+ */
+public final class TopologySerializer {
+
+ /**
+ * Shouldn't be instantiated.
+ */
+ private TopologySerializer() {
+ }
+
+ /**
+ * Recursively encode TaskNodes of a Topology into a byte array.
+ *
+ * @param root the root node of the subtree to encode
+ * @return encoded byte array
+ */
+ public static byte[] encode(final TaskNode root) {
+ try (final ByteArrayOutputStream bstream = new ByteArrayOutputStream();
+ final DataOutputStream dstream = new DataOutputStream(bstream)) {
+ encodeHelper(dstream, root);
+ return bstream.toByteArray();
+
+ } catch (final IOException e) {
+ throw new RuntimeException("Exception while encoding topology of " +
root.getTaskId(), e);
+ }
+ }
+
+ private static void encodeHelper(final DataOutputStream dstream,
+ final TaskNode node) throws IOException {
+ dstream.writeUTF(node.getTaskId());
+ dstream.writeInt(node.getNumberOfChildren());
+ for (final TaskNode child : node.getChildren()) {
+ encodeHelper(dstream, child);
+ }
+ }
+
+ /**
+ * Recursively translate a byte array into a TopologySimpleNode and a list
of task Identifiers.
+ *
+ * @param data encoded byte array
+ * @param ifac IdentifierFactory needed to generate Identifiers from String
Ids
+ * @return decoded TopologySimpleNode and a lexicographically sorted list of
task Identifiers
+ */
+ public static Pair<TopologySimpleNode, List<Identifier>> decode(
+ final byte[] data,
+ final IdentifierFactory ifac) {
+
+ try (final DataInputStream dstream = new DataInputStream(new
ByteArrayInputStream(data))) {
+ final List<Identifier> activeSlaveTasks = new LinkedList<>();
+ final TopologySimpleNode retNode = decodeHelper(dstream,
activeSlaveTasks, ifac);
+ return new Pair<>(retNode, activeSlaveTasks);
+
+ } catch (final IOException e) {
+ throw new RuntimeException("Exception while decoding message", e);
+ }
+ }
+
+ private static TopologySimpleNode decodeHelper(
+ final DataInputStream dstream,
+ final List<Identifier> activeSlaveTasks,
+ final IdentifierFactory ifac) throws IOException {
+ final String taskId = dstream.readUTF();
+ activeSlaveTasks.add(ifac.getNewInstance(taskId));
+ final TopologySimpleNode retNode = new TopologySimpleNode(taskId);
+
+ final int children = dstream.readInt();
+ for (int index = 0; index < children; index++) {
+ final TopologySimpleNode child = decodeHelper(dstream, activeSlaveTasks,
ifac);
+ retNode.addChild(child);
+ }
+
+ return retNode;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologySimpleNode.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologySimpleNode.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologySimpleNode.java
new file mode 100644
index 0000000..0907a1b
--- /dev/null
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologySimpleNode.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public final class TopologySimpleNode {
+ private final String taskId;
+ private final List<TopologySimpleNode> children;
+
+ public TopologySimpleNode(final String taskId) {
+ this.taskId = taskId;
+ this.children = new LinkedList<>();
+ }
+
+ public void addChild(final TopologySimpleNode childNode) {
+ children.add(childNode);
+ }
+
+ public String getTaskId() {
+ return this.taskId;
+ }
+
+ public Iterable<TopologySimpleNode> getChildren() {
+ return this.children;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
index 709f866..1811e66 100644
---
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
@@ -43,6 +43,7 @@ public class TopologyUpdateWaitHandler implements
EventHandler<List<TaskNode>> {
private final String dstId;
private final int dstVersion;
private final String qualifiedName;
+ private final byte[] data;
/**
@@ -59,7 +60,7 @@ public class TopologyUpdateWaitHandler implements
EventHandler<List<TaskNode>> {
final Class<? extends Name<String>>
operName,
final String driverId, final int
driverVersion,
final String dstId, final int dstVersion,
- final String qualifiedName) {
+ final String qualifiedName, final byte[]
data) {
super();
this.senderStage = senderStage;
this.groupName = groupName;
@@ -69,6 +70,7 @@ public class TopologyUpdateWaitHandler implements
EventHandler<List<TaskNode>> {
this.dstId = dstId;
this.dstVersion = dstVersion;
this.qualifiedName = qualifiedName;
+ this.data = data;
}
@@ -90,7 +92,7 @@ public class TopologyUpdateWaitHandler implements
EventHandler<List<TaskNode>> {
+ dstId + "," + dstVersion + ")");
senderStage.onNext(Utils.bldVersionedGCM(groupName, operName,
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated,
driverId, driverVersion, dstId,
- dstVersion, Utils.EMPTY_BYTE_ARR));
+ dstVersion, data));
LOG.exiting("TopologyUpdateWaitHandler", "onNext", qualifiedName);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
index 774a8f1..908316d 100644
---
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
@@ -27,14 +27,13 @@ import
org.apache.reef.io.network.group.impl.GroupChangesCodec;
import org.apache.reef.io.network.group.impl.GroupChangesImpl;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.GatherOperatorSpec;
import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.ScatterOperatorSpec;
import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
import
org.apache.reef.io.network.group.impl.config.parameters.ReduceFunctionParam;
import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
-import org.apache.reef.io.network.group.impl.operators.BroadcastReceiver;
-import org.apache.reef.io.network.group.impl.operators.BroadcastSender;
-import org.apache.reef.io.network.group.impl.operators.ReduceReceiver;
-import org.apache.reef.io.network.group.impl.operators.ReduceSender;
+import org.apache.reef.io.network.group.impl.operators.*;
import org.apache.reef.io.network.group.impl.utils.Utils;
import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
import org.apache.reef.io.serialization.Codec;
@@ -138,6 +137,20 @@ public class TreeTopology implements Topology {
} else {
jcb.bindImplementation(GroupCommOperator.class, ReduceSender.class);
}
+ } else if (operatorSpec instanceof ScatterOperatorSpec) {
+ final ScatterOperatorSpec scatterOperatorSpec = (ScatterOperatorSpec)
operatorSpec;
+ if (taskId.equals(scatterOperatorSpec.getSenderId())) {
+ jcb.bindImplementation(GroupCommOperator.class, ScatterSender.class);
+ } else {
+ jcb.bindImplementation(GroupCommOperator.class, ScatterReceiver.class);
+ }
+ } else if (operatorSpec instanceof GatherOperatorSpec) {
+ final GatherOperatorSpec gatherOperatorSpec = (GatherOperatorSpec)
operatorSpec;
+ if (taskId.equals(gatherOperatorSpec.getReceiverId())) {
+ jcb.bindImplementation(GroupCommOperator.class, GatherReceiver.class);
+ } else {
+ jcb.bindImplementation(GroupCommOperator.class, GatherSender.class);
+ }
}
final Configuration retConf = jcb.build();
LOG.exiting("TreeTopology", "getTaskConfig", getQualifiedName() +
confSer.toString(retConf));
@@ -298,7 +311,7 @@ public class TreeTopology implements Topology {
final EventHandler<List<TaskNode>> topoUpdateWaitHandler = new
TopologyUpdateWaitHandler(senderStage, groupName,
operName, driverId, 0,
dstId, version,
- getQualifiedName());
+ getQualifiedName(), TopologySerializer.encode(root));
final EStage<List<TaskNode>> nodeTopologyUpdateWaitStage = new
SingleThreadStage<>("NodeTopologyUpdateWaitStage",
topoUpdateWaitHandler,
nodes.size());
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
index 74e9a83..5a3e426 100644
---
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
@@ -135,7 +135,7 @@ public class BroadcastReceiver<T> implements
Broadcast.Receiver<T>, EventHandler
LOG.fine(this + " Waiting to receive broadcast");
final byte[] data;
try {
- data = topology.recvFromParent();
+ data =
topology.recvFromParent(ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast);
// TODO: Should receive the identity element instead of null
if (data == null) {
LOG.fine(this + " Received null. Perhaps one of my ancestors is
dead.");
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherReceiver.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherReceiver.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherReceiver.java
new file mode 100644
index 0000000..4b217f6
--- /dev/null
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherReceiver.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.operators;
+
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.Gather;
+import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
+import
org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.api.task.OperatorTopology;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import
org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
+import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
+import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
+import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+
+import javax.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+public class GatherReceiver<T> implements Gather.Receiver<T>,
EventHandler<GroupCommunicationMessage> {
+
+ private static final Logger LOG =
Logger.getLogger(GatherReceiver.class.getName());
+
+ private final Class<? extends Name<String>> groupName;
+ private final Class<? extends Name<String>> operName;
+ private final Codec<T> dataCodec;
+ private final OperatorTopology topology;
+ private final CommunicationGroupServiceClient commGroupClient;
+ private final AtomicBoolean init = new AtomicBoolean(false);
+ private final int version;
+
+ @Inject
+ public GatherReceiver(@Parameter(CommunicationGroupName.class) final String
groupName,
+ @Parameter(OperatorName.class) final String operName,
+ @Parameter(TaskConfigurationOptions.Identifier.class)
final String selfId,
+ @Parameter(DataCodec.class) final Codec<T> dataCodec,
+ @Parameter(DriverIdentifier.class) final String
driverId,
+ @Parameter(TaskVersion.class) final int version,
+ final CommGroupNetworkHandler commGroupNetworkHandler,
+ final NetworkService<GroupCommunicationMessage>
netService,
+ final CommunicationGroupServiceClient commGroupClient)
{
+ LOG.finest(operName + " has CommGroupHandler-" +
commGroupNetworkHandler.toString());
+ this.version = version;
+ this.groupName = Utils.getClass(groupName);
+ this.operName = Utils.getClass(operName);
+ this.dataCodec = dataCodec;
+ this.topology = new OperatorTopologyImpl(this.groupName, this.operName,
+ selfId, driverId, new
Sender(netService), version);
+ this.commGroupClient = commGroupClient;
+ commGroupNetworkHandler.register(this.operName, this);
+ }
+
+ @Override
+ public int getVersion() {
+ return version;
+ }
+
+ @Override
+ public void initialize() throws ParentDeadException {
+ topology.initialize();
+ }
+
+ @Override
+ public Class<? extends Name<String>> getOperName() {
+ return operName;
+ }
+
+ @Override
+ public Class<? extends Name<String>> getGroupName() {
+ return groupName;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("GatherReceiver:")
+ .append(Utils.simpleName(groupName))
+ .append(":")
+ .append(Utils.simpleName(operName))
+ .append(":")
+ .append(version);
+ return sb.toString();
+ }
+
+ @Override
+ public void onNext(final GroupCommunicationMessage msg) {
+ topology.handle(msg);
+ }
+
+ @Override
+ public List<T> receive() throws NetworkException, InterruptedException {
+ LOG.entering("GatherReceiver", "receive");
+ final Map<String, T> mapOfTaskIdToData = receiveMapOfTaskIdToData();
+
+ LOG.fine(this + " Sorting data according to lexicographical order of task
identifiers.");
+ final TreeMap<String, T> sortedMapOfTaskIdToData = new
TreeMap<>(mapOfTaskIdToData);
+ final List<T> retList = new LinkedList<>(sortedMapOfTaskIdToData.values());
+
+ LOG.exiting("GatherReceiver", "receive", retList);
+ return retList;
+ }
+
+ @Override
+ public List<T> receive(final List<? extends Identifier> order) throws
NetworkException, InterruptedException {
+ LOG.entering("GatherReceiver", "receive", order);
+ final Map<String, T> mapOfTaskIdToData = receiveMapOfTaskIdToData();
+
+ LOG.fine(this + " Sorting data according to specified order of task
identifiers.");
+ final List<T> retList = new LinkedList<>();
+ for (final Identifier key : order) {
+ final String keyString = key.toString();
+ if (mapOfTaskIdToData.containsKey(keyString)) {
+ retList.add(mapOfTaskIdToData.get(key.toString()));
+ } else {
+ LOG.warning(this + " Received no data from " + keyString + ". Adding
null.");
+ retList.add(null);
+ }
+ }
+
+ LOG.exiting("GatherReceiver", "receive", retList);
+ return retList;
+ }
+
+ private Map<String, T> receiveMapOfTaskIdToData() {
+ LOG.entering("GatherReceiver", "receiveMapOfTaskIdToData");
+ // I am root.
+ LOG.fine("I am " + this);
+
+ if (init.compareAndSet(false, true)) {
+ LOG.fine(this + " Communication group initializing.");
+ commGroupClient.initialize();
+ LOG.fine(this + " Communication group initialized.");
+ }
+
+ final Map<String, T> mapOfTaskIdToData = new HashMap<>();
+ try {
+ LOG.fine(this + " Waiting for children.");
+ final byte[] gatheredDataFromChildren = topology.recvFromChildren();
+
+ LOG.fine("Using " + dataCodec.getClass().getSimpleName() + " as codec.");
+ try (final ByteArrayInputStream bstream = new
ByteArrayInputStream(gatheredDataFromChildren);
+ final DataInputStream dstream = new DataInputStream(bstream)) {
+ while (dstream.available() > 0) {
+ final String identifier = dstream.readUTF();
+ final int dataLength = dstream.readInt();
+ final byte[] data = new byte[dataLength];
+ dstream.readFully(data);
+ mapOfTaskIdToData.put(identifier, dataCodec.decode(data));
+ }
+ LOG.fine(this + " Successfully received gathered data.");
+ }
+
+ } catch (final ParentDeadException e) {
+ throw new RuntimeException("ParentDeadException", e);
+ } catch (final IOException e) {
+ throw new RuntimeException("IOException", e);
+ }
+
+ LOG.exiting("GatherReceiver", "receiveMapOfTaskIdToData",
Arrays.toString(new Object[]{mapOfTaskIdToData, this}));
+ return mapOfTaskIdToData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherSender.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherSender.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherSender.java
new file mode 100644
index 0000000..a657b07
--- /dev/null
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherSender.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.operators;
+
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.Gather;
+import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
+import
org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.api.task.OperatorTopology;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import
org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
+import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
+import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
+import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+public class GatherSender<T> implements Gather.Sender<T>,
EventHandler<GroupCommunicationMessage> {
+
+ private static final Logger LOG =
Logger.getLogger(GatherSender.class.getName());
+
+ private final Class<? extends Name<String>> groupName;
+ private final Class<? extends Name<String>> operName;
+ private final Codec<T> dataCodec;
+ private final NetworkService<GroupCommunicationMessage> netService;
+ private final OperatorTopology topology;
+ private final CommunicationGroupServiceClient commGroupClient;
+ private final AtomicBoolean init = new AtomicBoolean(false);
+ private final int version;
+
+ @Inject
+ public GatherSender(@Parameter(CommunicationGroupName.class) final String
groupName,
+ @Parameter(OperatorName.class) final String operName,
+ @Parameter(TaskConfigurationOptions.Identifier.class)
final String selfId,
+ @Parameter(DataCodec.class) final Codec<T> dataCodec,
+ @Parameter(DriverIdentifier.class) final String driverId,
+ @Parameter(TaskVersion.class) final int version,
+ final CommGroupNetworkHandler commGroupNetworkHandler,
+ final NetworkService<GroupCommunicationMessage>
netService,
+ final CommunicationGroupServiceClient commGroupClient) {
+ LOG.finest(operName + "has CommGroupHandler-" +
commGroupNetworkHandler.toString());
+ this.version = version;
+ this.groupName = Utils.getClass(groupName);
+ this.operName = Utils.getClass(operName);
+ this.dataCodec = dataCodec;
+ this.netService = netService;
+ this.topology = new OperatorTopologyImpl(this.groupName, this.operName,
+ selfId, driverId, new
Sender(netService), version);
+ this.commGroupClient = commGroupClient;
+ commGroupNetworkHandler.register(this.operName, this);
+ }
+
+ @Override
+ public int getVersion() {
+ return version;
+ }
+
+ @Override
+ public void initialize() throws ParentDeadException {
+ topology.initialize();
+ }
+
+ @Override
+ public Class<? extends Name<String>> getOperName() {
+ return operName;
+ }
+
+ @Override
+ public Class<? extends Name<String>> getGroupName() {
+ return groupName;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("GatherSender:")
+ .append(Utils.simpleName(groupName))
+ .append(":")
+ .append(Utils.simpleName(operName))
+ .append(":")
+ .append(version);
+ return sb.toString();
+ }
+
+ @Override
+ public void onNext(final GroupCommunicationMessage msg) {
+ topology.handle(msg);
+ }
+
+ @Override
+ public void send(final T myData) throws NetworkException,
InterruptedException {
+ LOG.entering("GatherSender", "send", myData);
+ // I am an intermediate node or a leaf.
+ LOG.fine("I am " + this);
+
+ if (init.compareAndSet(false, true)) {
+ LOG.fine(this + " Communication group initializing.");
+ commGroupClient.initialize();
+ LOG.fine(this + " Communication group initialized.");
+ }
+
+ try {
+ LOG.finest(this + " Waiting for children.");
+ final byte[] gatheredData = topology.recvFromChildren();
+ final byte[] encodedMyData = dataCodec.encode(myData);
+
+ try (final ByteArrayOutputStream bstream = new ByteArrayOutputStream();
+ final DataOutputStream dstream = new DataOutputStream(bstream)) {
+ dstream.writeUTF(netService.getMyId().toString());
+ dstream.writeInt(encodedMyData.length);
+ dstream.write(encodedMyData);
+ dstream.write(gatheredData);
+ final byte[] mergedData = bstream.toByteArray();
+
+ LOG.fine(this + " Sending merged value to parent.");
+ topology.sendToParent(mergedData,
ReefNetworkGroupCommProtos.GroupCommMessage.Type.Gather);
+ }
+ } catch (final ParentDeadException e) {
+ throw new RuntimeException("ParentDeadException", e);
+ } catch (final IOException e) {
+ throw new RuntimeException("IOException", e);
+ }
+ LOG.exiting("GatherSender", "send");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterReceiver.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterReceiver.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterReceiver.java
new file mode 100644
index 0000000..1c814fb
--- /dev/null
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterReceiver.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.operators;
+
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.Scatter;
+import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
+import
org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.api.task.OperatorTopology;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import
org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
+import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
+import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
+import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
+import org.apache.reef.io.network.group.impl.utils.ScatterData;
+import org.apache.reef.io.network.group.impl.utils.ScatterDecoder;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+public final class ScatterReceiver<T> implements Scatter.Receiver<T>,
EventHandler<GroupCommunicationMessage> {
+
+ private static final Logger LOG =
Logger.getLogger(ScatterReceiver.class.getName());
+
+ private final Class<? extends Name<String>> groupName;
+ private final Class<? extends Name<String>> operName;
+ private final Codec<T> dataCodec;
+ private final OperatorTopology topology;
+ private final AtomicBoolean init = new AtomicBoolean(false);
+ private final CommunicationGroupServiceClient commGroupClient;
+ private final int version;
+ private final ScatterDecoder scatterDecoder;
+
+ @Inject
+ public ScatterReceiver(@Parameter(CommunicationGroupName.class) final String
groupName,
+ @Parameter(OperatorName.class) final String operName,
+ @Parameter(TaskConfigurationOptions.Identifier.class)
final String selfId,
+ @Parameter(DataCodec.class) final Codec<T> dataCodec,
+ @Parameter(DriverIdentifier.class) final String
driverId,
+ @Parameter(TaskVersion.class) final int version,
+ final CommGroupNetworkHandler commGroupNetworkHandler,
+ final NetworkService<GroupCommunicationMessage>
netService,
+ final CommunicationGroupServiceClient commGroupClient,
+ final ScatterDecoder scatterDecoder) {
+ LOG.finest(operName + "has CommGroupHandler-" +
commGroupNetworkHandler.toString());
+ this.version = version;
+ this.groupName = Utils.getClass(groupName);
+ this.operName = Utils.getClass(operName);
+ this.dataCodec = dataCodec;
+ this.scatterDecoder = scatterDecoder;
+ this.topology = new OperatorTopologyImpl(this.groupName, this.operName,
+ selfId, driverId, new
Sender(netService), version);
+ this.commGroupClient = commGroupClient;
+ commGroupNetworkHandler.register(this.operName, this);
+ }
+
+ @Override
+ public int getVersion() {
+ return version;
+ }
+
+ @Override
+ public void initialize() throws ParentDeadException {
+ topology.initialize();
+ }
+
+ @Override
+ public Class<? extends Name<String>> getOperName() {
+ return operName;
+ }
+
+ @Override
+ public Class<? extends Name<String>> getGroupName() {
+ return groupName;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("ScatterReceiver:")
+ .append(Utils.simpleName(groupName))
+ .append(":")
+ .append(Utils.simpleName(operName))
+ .append(":")
+ .append(version);
+ return sb.toString();
+ }
+
+ @Override
+ public void onNext(final GroupCommunicationMessage msg) {
+ topology.handle(msg);
+ }
+
+ @Override
+ public List<T> receive() throws NetworkException, InterruptedException {
+ LOG.entering("ScatterReceiver", "receive");
+ // I am intermediate node or leaf.
+ LOG.fine("I am " + this);
+
+ if (init.compareAndSet(false, true)) {
+ LOG.fine(this + " Communication group initializing.");
+ commGroupClient.initialize();
+ LOG.fine(this + " Communication group initialized.");
+ }
+
+ try {
+ LOG.fine(this + " Waiting to receive scatter from parent.");
+ final byte[] data =
topology.recvFromParent(ReefNetworkGroupCommProtos.GroupCommMessage.Type.Scatter);
+
+ if (data == null) {
+ LOG.fine(this + " Received null. Perhaps one of my ancestors is
dead.");
+ LOG.exiting("ScatterSender", "receive", null);
+ return null;
+ }
+
+ LOG.fine(this + " Successfully received scattered data.");
+ final ScatterData scatterData = scatterDecoder.decode(data);
+
+ LOG.fine(this + " Trying to propagate messages to children.");
+ topology.sendToChildren(scatterData.getChildrenData(),
ReefNetworkGroupCommProtos.GroupCommMessage.Type.Scatter);
+
+ LOG.fine(this + " Decoding data elements sent to me.");
+ final List<T> retList = new LinkedList<>();
+ for (final byte[] singleData : scatterData.getMyData()) {
+ retList.add(dataCodec.decode(singleData));
+ }
+
+ LOG.exiting("ScatterSender", "receive", retList);
+ return retList;
+
+ } catch (final ParentDeadException e) {
+ throw new RuntimeException("ParentDeadException", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterSender.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterSender.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterSender.java
new file mode 100644
index 0000000..bef15f4
--- /dev/null
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterSender.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.operators;
+
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.Scatter;
+import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
+import
org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.api.task.OperatorTopology;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import
org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
+import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
+import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
+import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
+import org.apache.reef.io.network.group.impl.utils.ScatterEncoder;
+import org.apache.reef.io.network.group.impl.utils.ScatterHelper;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+public final class ScatterSender<T> implements Scatter.Sender<T>,
EventHandler<GroupCommunicationMessage> {
+
+ private static final Logger LOG =
Logger.getLogger(ScatterSender.class.getName());
+
+ private final Class<? extends Name<String>> groupName;
+ private final Class<? extends Name<String>> operName;
+ private final Codec<T> dataCodec;
+ private final OperatorTopology topology;
+ private final AtomicBoolean init = new AtomicBoolean(false);
+ private final CommunicationGroupServiceClient commGroupClient;
+ private final int version;
+ private final ScatterEncoder scatterEncoder;
+
+ @Inject
+ public ScatterSender(@Parameter(CommunicationGroupName.class) final String
groupName,
+ @Parameter(OperatorName.class) final String operName,
+ @Parameter(TaskConfigurationOptions.Identifier.class)
final String selfId,
+ @Parameter(DataCodec.class) final Codec<T> dataCodec,
+ @Parameter(DriverIdentifier.class) final String
driverId,
+ @Parameter(TaskVersion.class) final int version,
+ final CommGroupNetworkHandler commGroupNetworkHandler,
+ final NetworkService<GroupCommunicationMessage>
netService,
+ final CommunicationGroupServiceClient commGroupClient,
+ final ScatterEncoder scatterEncoder) {
+ LOG.finest(operName + "has CommGroupHandler-" +
commGroupNetworkHandler.toString());
+ this.version = version;
+ this.groupName = Utils.getClass(groupName);
+ this.operName = Utils.getClass(operName);
+ this.dataCodec = dataCodec;
+ this.scatterEncoder = scatterEncoder;
+ this.topology = new OperatorTopologyImpl(this.groupName, this.operName,
+ selfId, driverId, new
Sender(netService), version);
+ this.commGroupClient = commGroupClient;
+ commGroupNetworkHandler.register(this.operName, this);
+ }
+
+ @Override
+ public int getVersion() {
+ return version;
+ }
+
+ @Override
+ public void initialize() throws ParentDeadException {
+ topology.initialize();
+ }
+
+ @Override
+ public Class<? extends Name<String>> getOperName() {
+ return operName;
+ }
+
+ @Override
+ public Class<? extends Name<String>> getGroupName() {
+ return groupName;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("ScatterSender:")
+ .append(Utils.simpleName(groupName))
+ .append(":")
+ .append(Utils.simpleName(operName))
+ .append(":")
+ .append(version);
+ return sb.toString();
+ }
+
+ @Override
+ public void onNext(final GroupCommunicationMessage msg) {
+ topology.handle(msg);
+ }
+
+ private void initializeGroup() {
+ if (init.compareAndSet(false, true)) {
+ LOG.fine(this + " Communication group initializing.");
+ commGroupClient.initialize();
+ LOG.fine(this + " Communication group initialized.");
+ }
+ }
+
+ @Override
+ public void send(final List<T> elements) throws NetworkException,
InterruptedException {
+ LOG.entering("ScatterSender", "send", elements);
+
+ initializeGroup();
+ send(elements,
+ ScatterHelper.getUniformCounts(elements.size(),
commGroupClient.getActiveSlaveTasks().size()),
+ commGroupClient.getActiveSlaveTasks());
+
+ LOG.exiting("ScatterSender", "send", elements);
+ }
+
+ @Override
+ public void send(final List<T> elements, final Integer... counts)
+ throws NetworkException, InterruptedException {
+ LOG.entering("ScatterSender", "send", new Object[]{elements, counts});
+
+ initializeGroup();
+ if (counts.length != commGroupClient.getActiveSlaveTasks().size()) {
+ throw new RuntimeException("Parameter 'counts' has length " +
counts.length
+ + ", but number of slaves is " +
commGroupClient.getActiveSlaveTasks().size());
+ }
+
+ send(elements,
+ Arrays.asList(counts),
+ commGroupClient.getActiveSlaveTasks());
+
+ LOG.exiting("ScatterSender", "send", Arrays.toString(new
Object[]{elements, counts}));
+ }
+
+ @Override
+ public void send(final List<T> elements, final List<? extends Identifier>
order)
+ throws NetworkException, InterruptedException {
+ LOG.entering("ScatterSender", "send", new Object[]{elements, order});
+
+ initializeGroup();
+ send(elements,
+ ScatterHelper.getUniformCounts(elements.size(), order.size()),
+ order);
+
+ LOG.exiting("ScatterSender", "send", Arrays.toString(new
Object[]{elements, order}));
+ }
+
+ @Override
+ public void send(final List<T> elements, final List<Integer> counts, final
List<? extends Identifier> order)
+ throws NetworkException, InterruptedException {
+ LOG.entering("ScatterSender", "send", new Object[]{elements, counts,
order});
+
+ if (counts.size() != order.size()) {
+ throw new RuntimeException("Parameter 'counts' has size " + counts.size()
+ + ", but parameter 'order' has size " + order.size() + ".");
+ }
+ initializeGroup();
+
+ // I am root.
+ LOG.fine("I am " + this);
+
+ LOG.fine(this + " Encoding data and determining which Tasks receive which
elements.");
+ final Map<String, byte[]> mapOfChildIdToBytes =
scatterEncoder.encode(elements, counts, order, dataCodec);
+
+ try {
+ LOG.fine(this + " Sending " + elements.size() + " elements.");
+ topology.sendToChildren(mapOfChildIdToBytes,
ReefNetworkGroupCommProtos.GroupCommMessage.Type.Scatter);
+
+ } catch (final ParentDeadException e) {
+ throw new RuntimeException("ParentDeadException during
OperatorTopology.sendToChildren()", e);
+ }
+
+ LOG.exiting("ScatterSender", "send", Arrays.toString(new Object[]{this,
elements, counts, order}));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
index f9143d9..8482268 100644
---
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
@@ -21,9 +21,9 @@ package org.apache.reef.io.network.group.impl.task;
import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.driver.task.TaskConfigurationOptions;
import org.apache.reef.exception.evaluator.NetworkException;
-import org.apache.reef.io.network.group.api.operators.Broadcast;
-import org.apache.reef.io.network.group.api.operators.GroupCommOperator;
-import org.apache.reef.io.network.group.api.operators.Reduce;
+import org.apache.reef.io.network.group.api.operators.*;
+import org.apache.reef.io.network.group.impl.driver.TopologySimpleNode;
+import org.apache.reef.io.network.group.impl.driver.TopologySerializer;
import org.apache.reef.io.network.impl.NetworkService;
import org.apache.reef.io.network.group.api.GroupChanges;
import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
@@ -38,6 +38,7 @@ import
org.apache.reef.io.network.group.impl.config.parameters.SerializedOperCon
import org.apache.reef.io.network.group.impl.operators.Sender;
import org.apache.reef.io.network.group.impl.utils.Utils;
import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.network.util.Pair;
import org.apache.reef.io.serialization.Codec;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Injector;
@@ -47,6 +48,8 @@ import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
import org.apache.reef.wake.impl.ThreadPoolStage;
import javax.inject.Inject;
@@ -65,6 +68,10 @@ public class CommunicationGroupClientImpl implements
CommunicationGroupServiceCl
private final Sender sender;
private final String taskId;
+ private final boolean isScatterSender;
+ private final IdentifierFactory identifierFactory;
+ private List<Identifier> activeSlaveTasks;
+ private TopologySimpleNode topologySimpleNodeRoot;
private final String driverId;
@@ -83,6 +90,7 @@ public class CommunicationGroupClientImpl implements
CommunicationGroupServiceCl
this.taskId = taskId;
this.driverId = driverId;
LOG.finest(groupName + " has GroupCommHandler-" +
groupCommNetworkHandler.toString());
+ this.identifierFactory = netService.getIdentifierFactory();
this.groupName = Utils.getClass(groupName);
this.groupCommNetworkHandler = groupCommNetworkHandler;
this.sender = new Sender(netService);
@@ -99,6 +107,7 @@ public class CommunicationGroupClientImpl implements
CommunicationGroupServiceCl
this.commGroupNetworkHandler =
Tang.Factory.getTang().newInjector().getInstance(CommGroupNetworkHandler.class);
this.groupCommNetworkHandler.register(this.groupName,
commGroupNetworkHandler);
+ boolean operatorIsScatterSender = false;
for (final String operatorConfigStr : operatorConfigs) {
final Configuration operatorConfig =
configSerializer.fromString(operatorConfigStr);
@@ -114,7 +123,13 @@ public class CommunicationGroupClientImpl implements
CommunicationGroupServiceCl
final String operName = injector.getNamedInstance(OperatorName.class);
this.operators.put(Utils.getClass(operName), operator);
LOG.finest(operName + " has CommGroupHandler-" +
commGroupNetworkHandler.toString());
+
+ if (!operatorIsScatterSender && operator instanceof Scatter.Sender) {
+ LOG.fine(operName + " is a scatter sender. Will keep track of active
slave tasks.");
+ operatorIsScatterSender = true;
+ }
}
+ this.isScatterSender = operatorIsScatterSender;
} catch (final InjectionException | IOException e) {
throw new RuntimeException("Unable to deserialize operator config", e);
}
@@ -147,6 +162,33 @@ public class CommunicationGroupClientImpl implements
CommunicationGroupServiceCl
}
@Override
+ public Scatter.Sender getScatterSender(final Class<? extends Name<String>>
operatorName) {
+ LOG.entering("CommunicationGroupClientImpl", "getScatterSender", new
Object[]{getQualifiedName(),
+ Utils.simpleName(operatorName)});
+ final GroupCommOperator op = operators.get(operatorName);
+ if (!(op instanceof Scatter.Sender)) {
+ throw new RuntimeException("Configured operator is not a scatter
sender");
+ }
+ commGroupNetworkHandler.addTopologyElement(operatorName);
+ LOG.exiting("CommunicationGroupClientImpl", "getScatterSender",
getQualifiedName() + op);
+ return (Scatter.Sender) op;
+ }
+
+ @Override
+ public Gather.Receiver getGatherReceiver(final Class<? extends Name<String>>
operatorName) {
+ LOG.entering("CommunicationGroupClientImpl", "getGatherReceiver", new
Object[]{getQualifiedName(),
+ Utils.simpleName(operatorName)});
+ final GroupCommOperator op = operators.get(operatorName);
+ if (!(op instanceof Gather.Receiver)) {
+ throw new RuntimeException("Configured operator is not a gather
receiver");
+ }
+ commGroupNetworkHandler.addTopologyElement(operatorName);
+ LOG.exiting("CommunicationGroupClientImpl", "getGatherReceiver",
getQualifiedName() + op);
+ return (Gather.Receiver) op;
+ }
+
+
+ @Override
public Broadcast.Receiver getBroadcastReceiver(final Class<? extends
Name<String>> operatorName) {
LOG.entering("CommunicationGroupClientImpl", "getBroadcastReceiver", new
Object[]{getQualifiedName(),
Utils.simpleName(operatorName)});
@@ -173,6 +215,32 @@ public class CommunicationGroupClientImpl implements
CommunicationGroupServiceCl
}
@Override
+ public Scatter.Receiver getScatterReceiver(final Class<? extends
Name<String>> operatorName) {
+ LOG.entering("CommunicationGroupClientImpl", "getScatterReceiver", new
Object[]{getQualifiedName(),
+ Utils.simpleName(operatorName)});
+ final GroupCommOperator op = operators.get(operatorName);
+ if (!(op instanceof Scatter.Receiver)) {
+ throw new RuntimeException("Configured operator is not a scatter
receiver");
+ }
+ commGroupNetworkHandler.addTopologyElement(operatorName);
+ LOG.exiting("CommunicationGroupClientImpl", "getScatterReceiver",
getQualifiedName() + op);
+ return (Scatter.Receiver) op;
+ }
+
+ @Override
+ public Gather.Sender getGatherSender(final Class<? extends Name<String>>
operatorName) {
+ LOG.entering("CommunicationGroupClientImpl", "getGatherSender", new
Object[]{getQualifiedName(),
+ Utils.simpleName(operatorName)});
+ final GroupCommOperator op = operators.get(operatorName);
+ if (!(op instanceof Gather.Sender)) {
+ throw new RuntimeException("Configured operator is not a gather sender");
+ }
+ commGroupNetworkHandler.addTopologyElement(operatorName);
+ LOG.exiting("CommunicationGroupClientImpl", "getGatherSender",
getQualifiedName() + op);
+ return (Gather.Sender) op;
+ }
+
+ @Override
public void initialize() {
LOG.entering("CommunicationGroupClientImpl", "initialize",
getQualifiedName());
if (init.compareAndSet(false, true)) {
@@ -191,6 +259,10 @@ public class CommunicationGroupClientImpl implements
CommunicationGroupServiceCl
throw new RuntimeException("InterruptedException while waiting for
initialization", e);
}
+ if (isScatterSender) {
+ updateTopology();
+ }
+
if (initHandler.getException() != null) {
throw new RuntimeException(getQualifiedName() + "Parent dead. Current
behavior is for the child to die too.");
}
@@ -261,10 +333,36 @@ public class CommunicationGroupClientImpl implements
CommunicationGroupServiceCl
do {
msg = commGroupNetworkHandler.waitForTopologyUpdate(operName);
} while (!isMsgVersionOk(msg));
+
+ if (isScatterSender) {
+ updateActiveTasks(msg);
+ }
}
LOG.exiting("CommunicationGroupClientImpl", "updateTopology",
getQualifiedName());
}
+ private void updateActiveTasks(final GroupCommunicationMessage msg) {
+ LOG.entering("CommunicationGroupClientImpl", "updateActiveTasks", new
Object[]{getQualifiedName(), msg});
+
+ final Pair<TopologySimpleNode, List<Identifier>> pair =
+ TopologySerializer.decode(msg.getData()[0], identifierFactory);
+
+ topologySimpleNodeRoot = pair.getFirst();
+
+ activeSlaveTasks = pair.getSecond();
+ // remove myself
+ activeSlaveTasks.remove(identifierFactory.getNewInstance(taskId));
+ // sort the tasks in lexicographical order on task ids
+ Collections.sort(activeSlaveTasks, new Comparator<Identifier>() {
+ @Override
+ public int compare(final Identifier o1, final Identifier o2) {
+ return o1.toString().compareTo(o2.toString());
+ }
+ });
+
+ LOG.exiting("CommunicationGroupClientImpl", "updateActiveTasks", new
Object[]{getQualifiedName(), msg});
+ }
+
private boolean isMsgVersionOk(final GroupCommunicationMessage msg) {
LOG.entering("CommunicationGroupClientImpl", "isMsgVersionOk", new
Object[]{getQualifiedName(), msg});
if (msg.hasVersion()) {
@@ -287,6 +385,16 @@ public class CommunicationGroupClientImpl implements
CommunicationGroupServiceCl
}
}
+ @Override
+ public List<Identifier> getActiveSlaveTasks() {
+ return this.activeSlaveTasks;
+ }
+
+ @Override
+ public TopologySimpleNode getTopologySimpleNodeRoot() {
+ return this.topologySimpleNodeRoot;
+ }
+
private String getQualifiedName() {
return Utils.simpleName(groupName) + " ";
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
index 308c33c..d64fa5c 100644
---
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
+++
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
@@ -37,6 +37,7 @@ import org.apache.reef.wake.impl.SingleThreadStage;
import javax.inject.Inject;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -218,11 +219,24 @@ public class OperatorTopologyImpl implements
OperatorTopology {
}
@Override
- public byte[] recvFromParent() throws ParentDeadException {
+ public void sendToChildren(final Map<String, byte[]> dataMap,
+ final
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType)
+ throws ParentDeadException {
+ LOG.entering("OperatorTopologyImpl", "sendToChildren", new
Object[]{getQualifiedName(), dataMap, msgType});
+ refreshEffectiveTopology();
+ assert (effectiveTopology != null);
+ effectiveTopology.sendToChildren(dataMap, msgType);
+ LOG.exiting("OperatorTopologyImpl", "sendToChildren",
+ Arrays.toString(new Object[]{getQualifiedName(), dataMap, msgType}));
+ }
+
+ @Override
+ public byte[] recvFromParent(final
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType)
+ throws ParentDeadException {
LOG.entering("OperatorTopologyImpl", "recvFromParent", getQualifiedName());
refreshEffectiveTopology();
assert (effectiveTopology != null);
- final byte[] retVal = effectiveTopology.recvFromParent();
+ final byte[] retVal = effectiveTopology.recvFromParent(msgType);
LOG.exiting("OperatorTopologyImpl", "recvFromParent", Arrays.toString(new
Object[]{getQualifiedName(), retVal}));
return retVal;
}
@@ -238,6 +252,16 @@ public class OperatorTopologyImpl implements
OperatorTopology {
return retVal;
}
+ @Override
+ public byte[] recvFromChildren() throws ParentDeadException {
+ LOG.entering("OperatorTopologyImpl", "recvFromChildren",
getQualifiedName());
+ refreshEffectiveTopology();
+ assert (effectiveTopology != null);
+ final byte[] retVal = effectiveTopology.recvFromChildren();
+ LOG.exiting("OperatorTopologyImpl", "recvFromChildren",
Arrays.toString(new Object[]{getQualifiedName(), retVal}));
+ return retVal;
+ }
+
/**
* Only refreshes the effective topology with deletion msgs from.
* deletionDeltas queue