http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/ReduceScatter.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/ReduceScatter.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/ReduceScatter.java new file mode 100644 index 0000000..792b655 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/ReduceScatter.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.operators; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.wake.Identifier; + +import java.util.List; + +/** + * MPI Reduce Scatter operator. + * <p/> + * Each task has a list of elements. Assume that each task reduces + * each element in the list to form a list of reduced elements at a dummy root. + * The dummy root then keeps the portion of the list assigned to it and + * scatters the remaining among the other tasks + */ +public interface ReduceScatter<T> extends GroupCommOperator { + + /** + * Apply this operation on elements where counts specify the distribution of + * elements to each task. Ordering is assumed to be default. + * <p/> + * Here counts is of the same size as the entire group not just children. + * + * @return List of values that result from applying reduce function on + * corresponding elements of each list received as a result of + * applying scatter. + */ + List<T> apply(List<T> elements, List<Integer> counts) throws InterruptedException, NetworkException; + + /** + * Apply this operation on elements where counts specify the distribution of + * elements to each task. Ordering is specified using order + * <p/> + * Here counts is of the same size as the entire group not just children + * + * @return List of values that result from applying reduce function on + * corresponding elements of each list received as a result of + * applying scatter. + */ + List<T> apply(List<T> elements, List<Integer> counts, + List<? extends Identifier> order) throws InterruptedException, NetworkException; + + /** + * get {@link org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction} configured + * + * @return {@link org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction} + */ + Reduce.ReduceFunction<T> getReduceFunction(); +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java new file mode 100644 index 0000000..6c58d61 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.operators; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.wake.Identifier; + +import java.util.List; + +/** + * MPI Scatter operator + * <p/> + * Scatter a list of elements to the receivers The receivers will receive a + * sub-list of elements targeted for them. Supports non-uniform distribution + * through the specification of counts + */ +public interface Scatter { + + /** + * Sender or Root. + */ + static interface Sender<T> extends GroupCommOperator { + + /** + * Distributes evenly across task ids sorted lexicographically. + */ + void send(List<T> elements) throws NetworkException, InterruptedException; + + /** + * Distributes as per counts across task ids sorted lexicographically. + */ + void send(List<T> elements, Integer... counts) throws NetworkException, InterruptedException; + + /** + * Distributes evenly across task ids sorted using order. + */ + void send(List<T> elements, List<? extends Identifier> order) + throws NetworkException, InterruptedException; + + /** + * Distributes as per counts across task ids sorted using order. + */ + void send(List<T> elements, List<Integer> counts, + List<? extends Identifier> order) throws NetworkException, InterruptedException; + } + + /** + * Receiver or non-roots. + */ + static interface Receiver<T> extends GroupCommOperator { + /** + * Receive the sub-list of elements targeted for the current receiver. + * + * @return list of elements targeted for the current receiver. + */ + List<T> receive() throws InterruptedException, NetworkException; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/package-info.java new file mode 100644 index 0000000..d2e600c --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/package-info.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Provides the interfaces for MPI style group communication operations. + * The interface is asymmetric for asymmetric operations and symmetric + * for symmetric operations unlike MPI which provides symmetric operations + * for all operations. + * + * The interface is asymmetric in the sense that Senders & Receivers are + * separated out for operations like Scatter and Gather. All participants + * do not execute the same function. A sender sends & a receiver receives. + * + * The interface only concentrates on the data part because we are on the + * data-plane of things in REEF. The control information is embedded in the + * {@link org.apache.reef.tang.Configuration} used to instantiate these + * operators. It is the responsibility of the Driver, the primary agent in + * the control-plane to configure these operators, that is, denote who is + * the sender, who are the receivers, what {@link org.apache.reef.io.serialization.Codec} + * need to be used and so on for an operation like Scatter with the root node + * acting as a sender and the other nodes as receivers. + * + * One thing implicit in MPI operations is the ordering of processors based + * on their ranks which determines the order of operations. For ex., if we + * scatter an array of 10 elements into 10 processors, then which processor + * gets the 1st entry & so on is based on the rank. + * + * In our case we do not have any ranks associated with tasks. Instead, + * by default we use the lexicographic order of the task ids. These can + * also be over-ridden in the send/receive/apply function calls + */ +package org.apache.reef.io.network.group.api.operators; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommGroupNetworkHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommGroupNetworkHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommGroupNetworkHandler.java new file mode 100644 index 0000000..45b1839 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommGroupNetworkHandler.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.task; + +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.group.impl.task.CommGroupNetworkHandlerImpl; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.wake.EventHandler; + +/** + * The EventHandler that receives the GroupCommunicationMsg + * pertaining to a specific Communication Group + */ +@DefaultImplementation(value = CommGroupNetworkHandlerImpl.class) +public interface CommGroupNetworkHandler extends EventHandler<GroupCommunicationMessage> { + + void register(Class<? extends Name<String>> operName, EventHandler<GroupCommunicationMessage> handler); + + void addTopologyElement(Class<? extends Name<String>> operName); + + GroupCommunicationMessage waitForTopologyUpdate(Class<? extends Name<String>> operName); + + byte[] waitForTopologyChanges(Class<? extends Name<String>> operName); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java new file mode 100644 index 0000000..a1370e5 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.task; + +import org.apache.reef.annotations.audience.TaskSide; +import org.apache.reef.io.network.group.api.operators.Broadcast; +import org.apache.reef.io.network.group.api.operators.Reduce; +import org.apache.reef.io.network.group.api.GroupChanges; +import org.apache.reef.io.network.group.impl.task.CommunicationGroupClientImpl; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.tang.annotations.Name; + +/** + * The Task side interface of a communication group + * Lets one get the operators configured for this task + * and use them for communication between tasks configured + * in this communication group + */ +@TaskSide +@DefaultImplementation(value = CommunicationGroupClientImpl.class) +public interface CommunicationGroupClient { + + /** + * @return The name configured on this communication group + */ + Class<? extends Name<String>> getName(); + + /** + * The broadcast sender configured on this communication group + * with the given oepratorName + * + * @param operatorName + * @return + */ + Broadcast.Sender getBroadcastSender(Class<? extends Name<String>> operatorName); + + /** + * The broadcast receiver configured on this communication group + * with the given oepratorName + * + * @param operatorName + * @return + */ + Broadcast.Receiver getBroadcastReceiver(Class<? extends Name<String>> operatorName); + + /** + * The reduce receiver configured on this communication group + * with the given oepratorName + * + * @param operatorName + * @return + */ + Reduce.Receiver getReduceReceiver(Class<? extends Name<String>> operatorName); + + /** + * The reduce sender configured on this communication group + * with the given oepratorName + * + * @param operatorName + * @return + */ + Reduce.Sender getReduceSender(Class<? extends Name<String>> operatorName); + + /** + * @return Changes in topology of this communication group since the last time + * this method was called + */ + GroupChanges getTopologyChanges(); + + /** + * Asks the driver to update the topology of this communication group. This can + * be an expensive call depending on what the minimum number of tasks is for this + * group to function as this first tells the driver, driver then tells the affected + * tasks and the driver gives a green only after affected tasks have had a chance + * to be sure that their topology will be updated before the next message is + * communicated + */ + void updateTopology(); + + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupServiceClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupServiceClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupServiceClient.java new file mode 100644 index 0000000..b75af08 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupServiceClient.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.task; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.io.network.group.impl.task.CommunicationGroupClientImpl; +import org.apache.reef.tang.annotations.DefaultImplementation; + +@Private +@DefaultImplementation(value = CommunicationGroupClientImpl.class) +public interface CommunicationGroupServiceClient extends CommunicationGroupClient { + /** + * Should not be used by user code + * Used for initialization of the + * communication group + */ + void initialize(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommClient.java new file mode 100644 index 0000000..14c811b --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommClient.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.task; + +import org.apache.reef.annotations.Provided; +import org.apache.reef.annotations.audience.TaskSide; +import org.apache.reef.io.network.group.impl.task.GroupCommClientImpl; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.tang.annotations.Name; + + +/** + * The task side interface for the Group Communication Service + */ +@TaskSide +@Provided +@DefaultImplementation(value = GroupCommClientImpl.class) +public interface GroupCommClient { + + /** + * @param string + * @return The communication group client with the given name that gives access + * to the operators configured on it that will be used to do group communication + */ + CommunicationGroupClient getCommunicationGroup(Class<? extends Name<String>> groupName); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java new file mode 100644 index 0000000..ecea973 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.task; + +import org.apache.reef.annotations.audience.TaskSide; +import org.apache.reef.io.network.Message; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.group.impl.task.GroupCommNetworkHandlerImpl; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.wake.EventHandler; + +/** + * The global EventHandler that receives the GroupCommunicationMsg + * and routes it to the relevant communication group + */ +@TaskSide +@DefaultImplementation(value = GroupCommNetworkHandlerImpl.class) +public interface GroupCommNetworkHandler extends EventHandler<Message<GroupCommunicationMessage>> { + + void register(Class<? extends Name<String>> groupName, EventHandler<GroupCommunicationMessage> commGroupNetworkHandler); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/NodeStruct.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/NodeStruct.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/NodeStruct.java new file mode 100644 index 0000000..08554f8 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/NodeStruct.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.task; + +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; + +/** + * The actual node that is part of the operator topology + * + * Receives data from the handlers & provides them to the + * operators/OperatorTopologyStruct when they need it. + * + * This implementation decouples the send & receive. + */ +public interface NodeStruct { + + String getId(); + + int getVersion(); + + void setVersion(int version); + + byte[] getData(); + + void addData(GroupCommunicationMessage msg); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java new file mode 100644 index 0000000..62b6934 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.task; + +import org.apache.reef.io.network.exception.ParentDeadException; +import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; +import org.apache.reef.io.serialization.Codec; + +/** + * Represents the local topology of tasks for an operator. It + * provides methods to send/rcv from parents & children + * <p/> + * Every operator is an EventHandler<GroupCommunicationMessage> + * and it will use an instance of this type to delegate the + * handling of the message and also uses it to communicate + * with its parents and children + * <p/> + * This is an operator facing interface. The actual topology is + * maintained in OperatorTopologyStruct. Current strategy is to + * maintain two versions of the topology and current operations + * are always delegated to effectiveTopology and the baseTopology + * is updated while initialization & when user calls updateTopology. + * So this is only a wrapper around the two versions of topologies + * and manages when to create/update them based on the messages from + * the driver. + */ +public interface OperatorTopology { + + void handle(GroupCommunicationMessage msg); + + void sendToParent(byte[] encode, ReefNetworkGroupCommProtos.GroupCommMessage.Type reduce) throws ParentDeadException; + + byte[] recvFromParent() throws ParentDeadException; + + void sendToChildren(byte[] data, ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) throws ParentDeadException; + + <T> T recvFromChildren(ReduceFunction<T> redFunc, Codec<T> dataCodec) throws ParentDeadException; + + void initialize() throws ParentDeadException; +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java new file mode 100644 index 0000000..dd262c9 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.task; + +import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.group.impl.operators.Sender; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.tang.annotations.Name; + +import java.util.Collection; +import java.util.Set; + +/** + * The actual local topology maintaining the + * children and parent that reacts to update + * and data msgs. The actual nodes are represented + * by NodeStruct and it handles receiving & + * providing data + */ +public interface OperatorTopologyStruct { + + Class<? extends Name<String>> getGroupName(); + + Class<? extends Name<String>> getOperName(); + + String getSelfId(); + + int getVersion(); + + NodeStruct getParent(); + + Collection<? extends NodeStruct> getChildren(); + + String getDriverId(); + + Sender getSender(); + + boolean hasChanges(); + + void setChanges(boolean b); + + void addAsData(GroupCommunicationMessage msg); + + void update(Set<GroupCommunicationMessage> deletionDeltas); + + void update(GroupCommunicationMessage msg); + + void sendToParent(byte[] data, ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType); + + byte[] recvFromParent(); + + void sendToChildren(byte[] data, ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType); + + <T> T recvFromChildren(ReduceFunction<T> redFunc, Codec<T> dataCodec); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesCodec.java new file mode 100644 index 0000000..46eb79e --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesCodec.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl; + +import org.apache.reef.io.network.group.api.GroupChanges; +import org.apache.reef.io.serialization.Codec; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class GroupChangesCodec implements Codec<GroupChanges> { + + @Inject + public GroupChangesCodec() { + } + + @Override + public GroupChanges decode(final byte[] changeBytes) { + return new GroupChangesImpl(changeBytes[0] == 1); + } + + @Override + public byte[] encode(final GroupChanges changes) { + final byte[] retVal = new byte[1]; + if (changes.exist()) { + retVal[0] = 1; + } + return retVal; + } + + public static void main(final String[] args) { + GroupChanges changes = new GroupChangesImpl(false); + final GroupChangesCodec changesCodec = new GroupChangesCodec(); + GroupChanges changes1 = changesCodec.decode(changesCodec.encode(changes)); + test(changes, changes1); + changes = new GroupChangesImpl(true); + changes1 = changesCodec.decode(changesCodec.encode(changes)); + test(changes, changes1); + } + + private static void test(final GroupChanges changes, final GroupChanges changes1) { + + final Logger LOG = Logger.getLogger(GroupChangesCodec.class.getName()); + + final boolean c1 = changes.exist(); + final boolean c2 = changes1.exist(); + + if (c1 != c2) { + LOG.log(Level.SEVERE, "Something is wrong: {0} != {1}", new Object[] {c1, c2}); + } else { + LOG.log(Level.INFO, "Codec is fine"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesImpl.java new file mode 100644 index 0000000..48da7e7 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupChangesImpl.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl; + +import org.apache.reef.io.network.group.api.GroupChanges; + +import javax.annotation.concurrent.Immutable; +import javax.annotation.concurrent.ThreadSafe; + +@Immutable +@ThreadSafe +public class GroupChangesImpl implements GroupChanges { + + private final boolean changes; + + public GroupChangesImpl(final boolean changes) { + this.changes = changes; + } + + @Override + public boolean exist() { + return changes; + } + + @Override + public String toString() { + return "Changes: " + changes; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java new file mode 100644 index 0000000..c02d6af --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl; + +import org.apache.reef.io.network.group.impl.utils.Utils; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; + +import java.util.Arrays; + +/** + * + */ +public class GroupCommunicationMessage { + private final String groupName; + private final String operName; + private final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType; + private final String from; + private final int srcVersion; + private final String to; + private final int dstVersion; + private final byte[][] data; + + private final String simpleGroupName; + private final String simpleOperName; + + public GroupCommunicationMessage( + final String groupName, + final String operName, + final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType, + final String from, final int srcVersion, + final String to, final int dstVersion, + final byte[][] data) { + super(); + this.groupName = groupName; + this.operName = operName; + this.msgType = msgType; + this.from = from; + this.srcVersion = srcVersion; + this.to = to; + this.dstVersion = dstVersion; + this.data = data; + this.simpleGroupName = Utils.simpleName(Utils.getClass(groupName)); + this.simpleOperName = Utils.simpleName(Utils.getClass(operName)); + } + + public String getGroupname() { + return groupName; + } + + public String getOperatorname() { + return operName; + } + + public String getSimpleOperName() { + return simpleOperName; + } + + public ReefNetworkGroupCommProtos.GroupCommMessage.Type getType() { + return msgType; + } + + public String getSrcid() { + return from; + } + + public int getSrcVersion() { + return srcVersion; + } + + public String getDestid() { + return to; + } + + public int getVersion() { + return dstVersion; + } + + public String getSource() { + return "(" + getSrcid() + "," + getSrcVersion() + ")"; + } + + public String getDestination() { + return "(" + getDestid() + "," + getVersion() + ")"; + } + + public byte[][] getData() { + return data; + } + + public int getMsgsCount() { + return data.length; + } + + public boolean hasVersion() { + return true; + } + + public boolean hasSrcVersion() { + return true; + } + + @Override + public String toString() { + return "[" + msgType + " from " + getSource() + " to " + getDestination() + " for " + simpleGroupName + ":" + simpleOperName + "]"; + } + + @Override + public boolean equals(final Object obj) { + if (this != obj) { + if (obj instanceof GroupCommunicationMessage) { + final GroupCommunicationMessage that = (GroupCommunicationMessage) obj; + if (!this.groupName.equals(that.groupName)) { + return false; + } + if (!this.operName.equals(that.operName)) { + return false; + } + if (!this.from.equals(that.from)) { + return false; + } + if (this.srcVersion != that.srcVersion) { + return false; + } + if (!this.to.equals(that.to)) { + return false; + } + if (this.dstVersion != that.dstVersion) { + return false; + } + if (!this.msgType.equals(that.msgType)) { + return false; + } + if (this.data.length != that.data.length) { + return false; + } + for (int i = 0; i < data.length; i++) { + if (!Arrays.equals(this.data[i], that.data[i])) { + return false; + } + } + + return true; + } else { + return false; + } + } else { + return true; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessageCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessageCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessageCodec.java new file mode 100644 index 0000000..8b5225d --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessageCodec.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl; + + +import org.apache.reef.io.network.impl.StreamingCodec; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos.GroupCommMessage; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos.GroupCommMessage.Type; + +import javax.inject.Inject; +import java.io.*; + +/** + * Codec for {@link GroupCommMessage} + */ +public class GroupCommunicationMessageCodec implements StreamingCodec<GroupCommunicationMessage> { + + @Inject + public GroupCommunicationMessageCodec() { + // Intentionally Blank + } + + @Override + public GroupCommunicationMessage decode(final byte[] data) { + try (ByteArrayInputStream bais = new ByteArrayInputStream(data)) { + try (DataInputStream dais = new DataInputStream(bais)) { + return decodeFromStream(dais); + } + } catch (final IOException e) { + throw new RuntimeException("IOException", e); + } + } + + @Override + public GroupCommunicationMessage decodeFromStream(final DataInputStream stream) { + try { + final String groupName = stream.readUTF(); + final String operName = stream.readUTF(); + final Type msgType = Type.valueOf(stream.readInt()); + final String from = stream.readUTF(); + final int srcVersion = stream.readInt(); + final String to = stream.readUTF(); + final int dstVersion = stream.readInt(); + final byte[][] gcmData = new byte[stream.readInt()][]; + for (int i = 0; i < gcmData.length; i++) { + gcmData[i] = new byte[stream.readInt()]; + stream.readFully(gcmData[i]); + } + return new GroupCommunicationMessage( + groupName, + operName, + msgType, + from, + srcVersion, + to, + dstVersion, + gcmData); + } catch (final IOException e) { + throw new RuntimeException("IOException", e); + } + } + + @Override + public byte[] encode(final GroupCommunicationMessage msg) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + try (DataOutputStream daos = new DataOutputStream(baos)) { + encodeToStream(msg, daos); + } + return baos.toByteArray(); + } catch (final IOException e) { + throw new RuntimeException("IOException", e); + } + } + + @Override + public void encodeToStream(final GroupCommunicationMessage msg, final DataOutputStream stream) { + try { + stream.writeUTF(msg.getGroupname()); + stream.writeUTF(msg.getOperatorname()); + stream.writeInt(msg.getType().getNumber()); + stream.writeUTF(msg.getSrcid()); + stream.writeInt(msg.getSrcVersion()); + stream.writeUTF(msg.getDestid()); + stream.writeInt(msg.getVersion()); + stream.writeInt(msg.getMsgsCount()); + for (final byte[] b : msg.getData()) { + stream.writeInt(b.length); + stream.write(b); + } + } catch (final IOException e) { + throw new RuntimeException("IOException", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/BroadcastOperatorSpec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/BroadcastOperatorSpec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/BroadcastOperatorSpec.java new file mode 100644 index 0000000..11514b1 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/BroadcastOperatorSpec.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.config; + +import org.apache.reef.io.network.group.api.config.OperatorSpec; +import org.apache.reef.io.network.group.impl.utils.Utils; +import org.apache.reef.io.serialization.Codec; + + +/** + * The specification for the broadcast operator + */ +public class BroadcastOperatorSpec implements OperatorSpec { + private final String senderId; + + /** + * Codec to be used to serialize data + */ + private final Class<? extends Codec> dataCodecClass; + + + public BroadcastOperatorSpec(final String senderId, + final Class<? extends Codec> dataCodecClass) { + super(); + this.senderId = senderId; + this.dataCodecClass = dataCodecClass; + } + + public String getSenderId() { + return senderId; + } + + @Override + public Class<? extends Codec> getDataCodecClass() { + return dataCodecClass; + } + + @Override + public String toString() { + return "Broadcast Operator Spec: [sender=" + senderId + "] [dataCodecClass=" + Utils.simpleName(dataCodecClass) + + "]"; + } + + public static Builder newBuilder() { + return new BroadcastOperatorSpec.Builder(); + } + + public static class Builder implements org.apache.reef.util.Builder<BroadcastOperatorSpec> { + private String senderId; + + private Class<? extends Codec> dataCodecClass; + + + public Builder setSenderId(final String senderId) { + this.senderId = senderId; + return this; + } + + public Builder setDataCodecClass(final Class<? extends Codec> codecClazz) { + this.dataCodecClass = codecClazz; + return this; + } + + @Override + public BroadcastOperatorSpec build() { + return new BroadcastOperatorSpec(senderId, dataCodecClass); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ReduceOperatorSpec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ReduceOperatorSpec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ReduceOperatorSpec.java new file mode 100644 index 0000000..386ad13 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ReduceOperatorSpec.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.config; + +import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction; +import org.apache.reef.io.network.group.api.config.OperatorSpec; +import org.apache.reef.io.network.group.impl.utils.Utils; +import org.apache.reef.io.serialization.Codec; + +/** + * The specification for the Reduce operator + */ +public class ReduceOperatorSpec implements OperatorSpec { + + private final String receiverId; + + /** + * Codec to be used to serialize data + */ + private final Class<? extends Codec> dataCodecClass; + + /** + * The reduce function to be used for operations that do reduction + */ + public final Class<? extends ReduceFunction> redFuncClass; + + + public ReduceOperatorSpec(final String receiverId, + final Class<? extends Codec> dataCodecClass, + final Class<? extends ReduceFunction> redFuncClass) { + super(); + this.receiverId = receiverId; + this.dataCodecClass = dataCodecClass; + this.redFuncClass = redFuncClass; + } + + public String getReceiverId() { + return receiverId; + } + + /** + * @return the redFuncClass + */ + public Class<? extends ReduceFunction> getRedFuncClass() { + return redFuncClass; + } + + @Override + public Class<? extends Codec> getDataCodecClass() { + return dataCodecClass; + } + + @Override + public String toString() { + return "Reduce Operator Spec: [receiver=" + receiverId + "] [dataCodecClass=" + Utils.simpleName(dataCodecClass) + + "] [reduceFunctionClass=" + Utils.simpleName(redFuncClass) + "]"; + } + + public static Builder newBuilder() { + return new ReduceOperatorSpec.Builder(); + } + + public static class Builder implements org.apache.reef.util.Builder<ReduceOperatorSpec> { + + private String receiverId; + + private Class<? extends Codec> dataCodecClass; + + private Class<? extends ReduceFunction> redFuncClass; + + public Builder setReceiverId(final String receiverId) { + this.receiverId = receiverId; + return this; + } + + public Builder setDataCodecClass(final Class<? extends Codec> codecClazz) { + this.dataCodecClass = codecClazz; + return this; + } + + public Builder setReduceFunctionClass(final Class<? extends ReduceFunction> redFuncClass) { + this.redFuncClass = redFuncClass; + return this; + } + + @Override + public ReduceOperatorSpec build() { + return new ReduceOperatorSpec(receiverId, dataCodecClass, redFuncClass); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/CommunicationGroupName.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/CommunicationGroupName.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/CommunicationGroupName.java new file mode 100644 index 0000000..49dab96 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/CommunicationGroupName.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.config.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +@NamedParameter(doc = "Name of the comm group") +public final class CommunicationGroupName implements Name<String> { + private CommunicationGroupName() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/DataCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/DataCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/DataCodec.java new file mode 100644 index 0000000..82613a7 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/DataCodec.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.config.parameters; + +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +@NamedParameter(doc = "Codec used to serialize and deserialize data in operators") +public final class DataCodec implements Name<Codec> { + private DataCodec() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/OperatorName.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/OperatorName.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/OperatorName.java new file mode 100644 index 0000000..9098577 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/OperatorName.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.config.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +@NamedParameter(doc = "Name of the operator") +public final class OperatorName implements Name<String> { + private OperatorName() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/ReduceFunctionParam.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/ReduceFunctionParam.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/ReduceFunctionParam.java new file mode 100644 index 0000000..d37408b --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/ReduceFunctionParam.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.config.parameters; + +import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +@NamedParameter(doc = "The reduce function class that is associated with a reduce operator") +public final class ReduceFunctionParam implements Name<ReduceFunction> { + private ReduceFunctionParam() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedGroupConfigs.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedGroupConfigs.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedGroupConfigs.java new file mode 100644 index 0000000..1844766 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedGroupConfigs.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.config.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +import java.util.Set; + +@NamedParameter(doc = "Serialized communication group configurations") +public final class SerializedGroupConfigs implements Name<Set<String>> { + private SerializedGroupConfigs() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedOperConfigs.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedOperConfigs.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedOperConfigs.java new file mode 100644 index 0000000..e0103f5 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/SerializedOperConfigs.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.config.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +import java.util.Set; + +@NamedParameter(doc = "Serialized operator configurations") +public final class SerializedOperConfigs implements Name<Set<String>> { + private SerializedOperConfigs() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TaskVersion.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TaskVersion.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TaskVersion.java new file mode 100644 index 0000000..b1caa84 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TaskVersion.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.config.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +@NamedParameter(doc = "The version that this task is assigned") +public final class TaskVersion implements Name<Integer> { + private TaskVersion() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TreeTopologyFanOut.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TreeTopologyFanOut.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TreeTopologyFanOut.java new file mode 100644 index 0000000..c759130 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/TreeTopologyFanOut.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.config.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +@NamedParameter(doc = "The fan out for the tree topology", default_value = "2", short_name = "fanout") +public final class TreeTopologyFanOut implements Name<Integer> { + private TreeTopologyFanOut() { + } +}
