http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/SlaveTask.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/SlaveTask.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/SlaveTask.java new file mode 100644 index 0000000..dde0bc3 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/SlaveTask.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.group.broadcast; + +import org.apache.reef.examples.group.bgd.operatornames.ControlMessageBroadcaster; +import org.apache.reef.examples.group.bgd.parameters.AllCommunicationGroup; +import org.apache.reef.examples.group.broadcast.parameters.ModelBroadcaster; +import org.apache.reef.examples.group.broadcast.parameters.ModelReceiveAckReducer; +import org.apache.reef.examples.group.utils.math.Vector; +import org.apache.reef.io.network.group.api.operators.Broadcast; +import org.apache.reef.io.network.group.api.operators.Reduce; +import org.apache.reef.io.network.group.api.task.CommunicationGroupClient; +import org.apache.reef.io.network.group.api.task.GroupCommClient; +import org.apache.reef.task.Task; + +import javax.inject.Inject; + +/** + * + */ +public class SlaveTask implements Task { + private final CommunicationGroupClient communicationGroupClient; + private final Broadcast.Receiver<ControlMessages> controlMessageBroadcaster; + private final Broadcast.Receiver<Vector> modelBroadcaster; + private final Reduce.Sender<Boolean> modelReceiveAckReducer; + + @Inject + public SlaveTask( + final GroupCommClient groupCommClient) { + this.communicationGroupClient = groupCommClient.getCommunicationGroup(AllCommunicationGroup.class); + this.controlMessageBroadcaster = communicationGroupClient.getBroadcastReceiver(ControlMessageBroadcaster.class); + this.modelBroadcaster = communicationGroupClient.getBroadcastReceiver(ModelBroadcaster.class); + this.modelReceiveAckReducer = communicationGroupClient.getReduceSender(ModelReceiveAckReducer.class); + } + + @Override + public byte[] call(final byte[] memento) throws Exception { + boolean stop = false; + while (!stop) { + final ControlMessages controlMessage = controlMessageBroadcaster.receive(); + switch (controlMessage) { + case Stop: + stop = true; + break; + + case ReceiveModel: + modelBroadcaster.receive(); + if (Math.random() < 0.1) { + throw new RuntimeException("Simulated Failure"); + } + modelReceiveAckReducer.send(true); + break; + + default: + break; + } + } + return null; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/AllCommunicationGroup.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/AllCommunicationGroup.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/AllCommunicationGroup.java new file mode 100644 index 0000000..9e8781e --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/AllCommunicationGroup.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.group.broadcast.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * + */ +@NamedParameter() +public final class AllCommunicationGroup implements Name<String> { + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ControlMessageBroadcaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ControlMessageBroadcaster.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ControlMessageBroadcaster.java new file mode 100644 index 0000000..4e7607a --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ControlMessageBroadcaster.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.group.broadcast.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +@NamedParameter() +public final class ControlMessageBroadcaster implements Name<String> { +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/Dimensions.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/Dimensions.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/Dimensions.java new file mode 100644 index 0000000..e4814c6 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/Dimensions.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.group.broadcast.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * + */ +@NamedParameter(doc = "Model dimensions", short_name = "dim") +public class Dimensions implements Name<Integer> { + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/FailureProbability.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/FailureProbability.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/FailureProbability.java new file mode 100644 index 0000000..911ad25 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/FailureProbability.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.group.broadcast.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * + */ +@NamedParameter(doc = "Prob(failure)", default_value = "0.1") +public class FailureProbability implements Name<Double> { + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ModelBroadcaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ModelBroadcaster.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ModelBroadcaster.java new file mode 100644 index 0000000..0c673bf --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ModelBroadcaster.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.group.broadcast.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +@NamedParameter() +public final class ModelBroadcaster implements Name<String> { +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ModelReceiveAckReducer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ModelReceiveAckReducer.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ModelReceiveAckReducer.java new file mode 100644 index 0000000..635eeb7 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ModelReceiveAckReducer.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.group.broadcast.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +@NamedParameter() +public final class ModelReceiveAckReducer implements Name<String> { +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/NumberOfReceivers.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/NumberOfReceivers.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/NumberOfReceivers.java new file mode 100644 index 0000000..6c7983e --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/NumberOfReceivers.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.group.broadcast.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * + */ +@NamedParameter(doc = "The number of receivers for the operators", short_name = "receivers") +public class NumberOfReceivers implements Name<Integer> { + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/AbstractImmutableVector.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/AbstractImmutableVector.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/AbstractImmutableVector.java new file mode 100644 index 0000000..9fa8498 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/AbstractImmutableVector.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.group.utils.math; + + +import org.apache.reef.io.Tuple; + +import java.util.Formatter; +import java.util.Locale; + +/** + * Base class for implementing ImmutableVector + */ +abstract class AbstractImmutableVector implements ImmutableVector { + + @Override + public abstract double get(int i); + + @Override + public abstract int size(); + + @Override + public double dot(final Vector that) { + assert (this.size() == that.size()); + + double result = 0.0; + for (int index = 0; index < this.size(); ++index) { + result += this.get(index) * that.get(index); + } + return result; + } + + + @Override + public double sum() { + double result = 0.0; + for (int i = 0; i < this.size(); ++i) { + result += this.get(i); + } + return result; + } + + @Override + public double norm2() { + return Math.sqrt(dot((Vector) this)); + } + + @Override + public double norm2Sqr() { + return dot((Vector) this); + } + + @SuppressWarnings("boxing") + @Override + public String toString() { + final StringBuilder b = new StringBuilder("DenseVector("); + try (final Formatter formatter = new Formatter(b, Locale.US)) { + final int size = Math.min(25, this.size()); + for (int i = 0; i < size; ++i) { + if (i < size - 1) { + formatter.format("%1.3f, ", this.get(i)); + } else { + formatter.format("%1.3f ", this.get(i)); + } + } + if (this.size() > 25) { + formatter.format("..."); + } + } + b.append(')'); + return b.toString(); + } + + @Override + public Tuple<Integer, Double> min() { + double min = get(0); + int minIdx = 0; + for (int i = 1; i < this.size(); ++i) { + final double curVal = get(i); + if (curVal < min) { + min = curVal; + minIdx = i; + } + } + return new Tuple<Integer, Double>(minIdx, min); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/AbstractVector.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/AbstractVector.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/AbstractVector.java new file mode 100644 index 0000000..f2afcef --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/AbstractVector.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.group.utils.math; + +/** + * Abstract base class for {@link Vector} implementations. + * <p/> + * The only methods to be implemented by subclasses are get, set and size. + */ +public abstract class AbstractVector extends AbstractImmutableVector implements Vector { + + @Override + public abstract void set(int i, double v); + + + @Override + public void add(final Vector that) { + for (int index = 0; index < this.size(); ++index) { + this.set(index, this.get(index) + that.get(index)); + } + } + + @Override + public void multAdd(final double factor, final ImmutableVector that) { + for (int index = 0; index < this.size(); ++index) { + this.set(index, this.get(index) + factor * that.get(index)); + } + } + + @Override + public void scale(final double factor) { + for (int index = 0; index < this.size(); ++index) { + this.set(index, this.get(index) * factor); + } + } + + + @Override + public void normalize() { + final double factor = 1.0 / this.norm2(); + this.scale(factor); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/DenseVector.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/DenseVector.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/DenseVector.java new file mode 100644 index 0000000..bc46521 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/DenseVector.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.group.utils.math; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Random; + +/** + * A dense {@link Vector} implementation backed by a double[] + */ +public class DenseVector extends AbstractVector implements Serializable { + + private static final long serialVersionUID = 1L; + private final double[] values; + + /** + * Creates a dense vector of the given size + */ + public DenseVector(final int size) { + this(new double[size]); + } + + public DenseVector(final double[] values) { + this.values = values; + } + + /** + * Instantiates a new DenseVector by copying the given other vector. + */ + public DenseVector(final ImmutableVector other) { + final int size = other.size(); + this.values = new double[size]; + for (int i = 0; i < size; ++i) { + this.values[i] = other.get(i); + } + } + + public DenseVector(final DenseVector other) { + this.values = Arrays.copyOf(other.values, other.values.length); + } + + @Override + public void set(final int i, final double v) { + this.values[i] = v; + } + + @Override + public double get(final int i) { + return this.values[i]; + } + + @Override + public int size() { + return this.values.length; + } + + /** + * Access the underlying storage. This is unsafe. + */ + public double[] getValues() { + return this.values; + } + + /** + * Creates a random Vector of size 'size' where each element is individually + * drawn from Math.random() + * + * @return a random Vector of the given size where each element is + * individually drawn from Math.random() + */ + public static DenseVector rand(final int size) { + return rand(size, new Random()); + } + + /** + * Creates a random Vector of size 'size' where each element is individually + * drawn from Math.random() + * + * @param random the random number generator to use. + * @return a random Vector of the given size where each element is + * individually drawn from Math.random() + */ + public static DenseVector rand(final int size, final Random random) { + final DenseVector vec = new DenseVector(size); + for (int i = 0; i < size; ++i) { + vec.values[i] = random.nextDouble(); + } + return vec; + } + + @Override + public Vector newInstance() { + return new DenseVector(size()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/ImmutableVector.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/ImmutableVector.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/ImmutableVector.java new file mode 100644 index 0000000..cfd2e8d --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/ImmutableVector.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.group.utils.math; + + +import org.apache.reef.io.Tuple; + +/** + * Represents an immutable vector. + */ +public interface ImmutableVector { + /** + * Access the value of the Vector at dimension i + * + * @param i index + * @return the value at index i + */ + double get(int i); + + /** + * The size (dimensionality) of the Vector + * + * @return the size of the Vector. + */ + int size(); + + /** + * Computes the inner product with another Vector. + * + * @param that + * @return the inner product between two Vectors. + */ + double dot(Vector that); + + /** + * Computes the computeSum of all entries in the Vector. + * + * @return the computeSum of all entries in this Vector + */ + double sum(); + + /** + * Computes the L2 norm of this Vector. + * + * @return the L2 norm of this Vector. + */ + double norm2(); + + /** + * Computes the square of the L2 norm of this Vector. + * + * @return the square of the L2 norm of this Vector. + */ + double norm2Sqr(); + + /** + * Computes the min of all entries in the Vector + * + * @return the min of all entries in this Vector + */ + Tuple<Integer, Double> min(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/SparseVector.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/SparseVector.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/SparseVector.java new file mode 100644 index 0000000..48a3b21 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/SparseVector.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.group.utils.math; + + +/** + * A sparse vector represented by an index and value array. + */ +public final class SparseVector extends AbstractImmutableVector { + + private final double[] values; + private final int[] indices; + private final int size; + + + public SparseVector(final double[] values, final int[] indices, final int size) { + this.values = values; + this.indices = indices; + this.size = size; + } + + public SparseVector(final double[] values, final int[] indices) { + this(values, indices, -1); + } + + + @Override + public double get(final int index) { + for (int i = 0; i < indices.length; ++i) { + if (indices[i] == index) { + return values[i]; + } + } + return 0; + } + + @Override + public int size() { + return this.size; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/Vector.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/Vector.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/Vector.java new file mode 100644 index 0000000..021a666 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/Vector.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.group.utils.math; + +import java.io.Serializable; + +/** + * An interface for Linear Alebra Vectors. + */ +public interface Vector extends ImmutableVector, Serializable { + + /** + * Set dimension i of the Vector to value v + * + * @param i the index + * @param v value + */ + public void set(final int i, final double v); + + /** + * Adds the Vector that to this one in place: this += that. + * + * @param that + */ + public void add(final Vector that); + + /** + * this += factor * that. + * + * @param factor + * @param that + */ + public void multAdd(final double factor, final ImmutableVector that); + + /** + * Scales this Vector: this *= factor. + * + * @param factor the scaling factor. + */ + public void scale(final double factor); + + + /** + * Normalizes the Vector according to the L2 norm. + */ + public void normalize(); + + /** + * Create a new instance of the current type + * with elements being zero + * + * @return zero vector of current dimensionality + */ + public Vector newInstance(); + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/VectorCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/VectorCodec.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/VectorCodec.java new file mode 100644 index 0000000..e7ad7e9 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/VectorCodec.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.group.utils.math; + +import org.apache.reef.wake.remote.Codec; + +import javax.inject.Inject; +import java.io.*; + +/** + * Codec for the Vector type Uses Data*Stream + * + * @author shravan + */ +public class VectorCodec implements Codec<Vector> { + /** + * This class is instantiated by TANG + */ + @Inject + public VectorCodec() { + // Intentionally blank + } + + @Override + public Vector decode(byte[] data) { + ByteArrayInputStream bais = new ByteArrayInputStream(data); + Vector result; + try (DataInputStream dais = new DataInputStream(bais)) { + int size = dais.readInt(); + result = new DenseVector(size); + for (int i = 0; i < size; i++) + result.set(i, dais.readDouble()); + } catch (IOException e) { + throw new RuntimeException(e.getCause()); + } + return result; + } + + @Override + public byte[] encode(Vector vec) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(vec.size() + * (Double.SIZE / Byte.SIZE)); + try (DataOutputStream daos = new DataOutputStream(baos)) { + daos.writeInt(vec.size()); + for (int i = 0; i < vec.size(); i++) { + daos.writeDouble(vec.get(i)); + } + } catch (IOException e) { + throw new RuntimeException(e.getCause()); + } + return baos.toByteArray(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/Window.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/Window.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/Window.java new file mode 100644 index 0000000..659e118 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/Window.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.group.utils.math; + +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +// TODO: Document +public class Window { + + private final int maxSize; + private final List<Double> list; + + public Window(int size) { + this.maxSize = size; + list = new ArrayList<>(size); + } + + public void add(double d) { + if (list.size() < maxSize) { + list.add(d); + return; + } + list.remove(0); + list.add(d); + } + + public double avg() { + if (list.size() == 0) + return 0; + double retVal = 0; + for (double d : list) { + retVal += d; + } + return retVal / list.size(); + } + + public double avgIfAdded(double d) { + if (list.isEmpty()) + return d; + int start = (list.size() < maxSize) ? 0 : 1; + int numElems = (list.size() < maxSize) ? list.size() + 1 : maxSize; + for (int i = start; i < list.size(); i++) + d += list.get(i); + return d / numElems; + } + + public static void main(String[] args) { + final Logger log = Logger.getLogger(Window.class.getName()); + final Window w = new Window(3); + for (int i = 1; i < 10; i++) { + final double exp = w.avgIfAdded(i); + w.add(i); + final double act = w.avg(); + log.log(Level.INFO, "OUT: Exp: {0} Act: {1}", new Object[] {exp, act}); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/timer/Timer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/timer/Timer.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/timer/Timer.java new file mode 100644 index 0000000..729ddf6 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/timer/Timer.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.group.utils.timer; + +import java.util.logging.Level; +import java.util.logging.Logger; + +public class Timer implements AutoCloseable { + + private static final Logger LOG = Logger.getLogger(Timer.class.getName()); + + public static final int MINUTES = 60 * 1000; // ms + public static final int HOURS = 60 * MINUTES; + + private final Logger log; + private final Level level; + private final String description; + private final long timeStart; + + public Timer(final Logger log, final String description) { + this(log, Level.INFO, description); + } + + public Timer(final String description) { + this(LOG, Level.INFO, description); + } + + public Timer(final Logger log, final Level level, final String description) { + this.log = log; + this.level = level; + this.description = description; + this.timeStart = System.currentTimeMillis(); + this.log.log(this.level, "TIMER Start: {0}", this.description); + } + + @Override + public void close() { + final long timeEnd = System.currentTimeMillis(); + this.log.log(this.level, "TIMER End: {0} took {1} sec.", + new Object[]{this.description, (timeEnd - this.timeStart) / 1000.0}); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java index 960f297..544f333 100644 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java @@ -28,7 +28,10 @@ import org.apache.reef.tang.Tang; import javax.annotation.concurrent.ThreadSafe; import javax.inject.Inject; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java index e6a5d6c..eefdcde 100644 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java @@ -33,8 +33,8 @@ import java.util.List; public final class BlockingEventHandler<T> implements EventHandler<T> { private final int expectedSize; - private final EventHandler<Iterable<T>> destination; private List<T> events = new ArrayList<>(); + private final EventHandler<Iterable<T>> destination; public BlockingEventHandler(final int expectedSize, final EventHandler<Iterable<T>> destination) { this.expectedSize = expectedSize; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java index 9d2d5b8..804e67c 100644 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java @@ -31,28 +31,27 @@ import java.util.logging.Logger; */ public class LoggingEventHandler<T> implements EventHandler<T> { + private static final Logger LOG = Logger.getLogger(LoggingEventHandler.class.getName()); + private final EventHandler<T> downstreamEventHandler; - private final String prefix; - private final String suffix; + private final String format; /** - * @param prefix to be logged before the event * @param downstreamEventHandler the event handler to hand the event to - * @param suffix to be logged after the event + * @param format Format string to log the event, e.g. "Event {0} received". */ - public LoggingEventHandler(final String prefix, EventHandler<T> downstreamEventHandler, final String suffix) { + public LoggingEventHandler(EventHandler<T> downstreamEventHandler, final String format) { this.downstreamEventHandler = downstreamEventHandler; - this.prefix = prefix; - this.suffix = suffix; + this.format = format; } public LoggingEventHandler(final EventHandler<T> downstreamEventHandler) { - this("", downstreamEventHandler, ""); + this(downstreamEventHandler, "{0}"); } @Override public void onNext(final T value) { - Logger.getLogger(LoggingEventHandler.class.getName()).log(Level.INFO, prefix + value.toString() + suffix); + LOG.log(Level.INFO, this.format, value); this.downstreamEventHandler.onNext(value); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/pom.xml b/lang/java/reef-io/pom.xml index 90bdf1d..b44cb77 100644 --- a/lang/java/reef-io/pom.xml +++ b/lang/java/reef-io/pom.xml @@ -45,6 +45,11 @@ under the License. <arg value="--java_out=target/generated-sources/proto"/> <arg value="src/main/proto/ns_protocol.proto"/> </exec> + <exec executable="protoc"> + <arg value="--proto_path=src/main/proto/"/> + <arg value="--java_out=target/generated-sources/proto"/> + <arg value="src/main/proto/group_comm_protocol.proto"/> + </exec> </tasks> <sourceRoot>target/generated-sources/proto</sourceRoot> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/GroupChanges.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/GroupChanges.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/GroupChanges.java new file mode 100644 index 0000000..1758646 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/GroupChanges.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api; + +import org.apache.reef.annotations.audience.TaskSide; + +/** + * Represents the changes in Topology that happened in a communication group + * from the last time the user asked for topology changes + */ +@TaskSide +public interface GroupChanges { + + boolean exist(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/config/OperatorSpec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/config/OperatorSpec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/config/OperatorSpec.java new file mode 100644 index 0000000..eabd23c --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/config/OperatorSpec.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.config; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.io.serialization.Codec; + +/** + * The specification of an operator submitted by the user + * while configuring the communication group. + */ +@DriverSide +@Private +public interface OperatorSpec { + + /** + * @return The codec class to be used to serialize & deserialize data + * in the group communication operators. + */ + Class<? extends Codec> getDataCodecClass(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java new file mode 100644 index 0000000..ca486a3 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.driver; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.io.network.group.api.task.GroupCommClient; +import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec; +import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.annotations.Name; + +/** + * The driver side interface of a Communication Group + * Lets one add opertaors and tasks. + * Main function is to extract the configuration related + * to the Group Communication for a task in the comm group + */ +@DriverSide +public interface CommunicationGroupDriver { + + /** + * Add the broadcast operator specified by the + * 'spec' with name 'operatorName' into this + * Communication Group + * + * @param operatorName + * @param spec + * @return + */ + public CommunicationGroupDriver addBroadcast(Class<? extends Name<String>> operatorName, BroadcastOperatorSpec spec); + + /** + * Add the reduce operator specified by the + * 'spec' with name 'operatorName' into this + * Communication Group + * + * @param operatorName + * @param spec + * @return + */ + public CommunicationGroupDriver addReduce(Class<? extends Name<String>> operatorName, ReduceOperatorSpec spec); + + /** + * This signals to the service that no more + * operator specs will be added to this communication + * group and an attempt to do that will throw an + * IllegalStateException + */ + public void finalise(); + + /** + * Returns a configuration that includes the partial task + * configuration passed in as 'taskConf' and makes the + * current communication group and the operators configured + * on it available on the Task side. Provides for injection + * of {@link GroupCommClient} + * + * @param taskConf + * @return + */ + public Configuration getTaskConfiguration(Configuration taskConf); + + /** + * Add the task represented by this configuration to this + * communication group. The configuration needs to contain + * the id of the Task that will be used + * + * @param partialTaskConf + */ + public void addTask(Configuration partialTaskConf); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java new file mode 100644 index 0000000..fc7e919 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.driver; + +import org.apache.reef.annotations.Provided; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.io.network.group.impl.driver.GroupCommDriverImpl; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.tang.annotations.Name; + +/** + * The driver side interface of Group Communication + * which is the entry point for the service + */ +@DriverSide +@Provided +@DefaultImplementation(value = GroupCommDriverImpl.class) +public interface GroupCommDriver { + + /** + * Create a new communication group with the specified name + * and the minimum number of tasks needed in this group before + * communication can start + * + * @param groupName + * @param numberOfTasks + * @return + */ + CommunicationGroupDriver newCommunicationGroup(Class<? extends Name<String>> groupName, int numberOfTasks); + + /** + * Tests whether the activeContext is a context configured + * using the Group Communication Service + * + * @param activeContext + * @return + */ + boolean isConfigured(ActiveContext activeContext); + + /** + * @return Configuration needed for a Context that should have + * Group Communication Service enabled + */ + Configuration getContextConfiguration(); + + /** + * @return Configuration needed to enable + * Group Communication as a Service + */ + Configuration getServiceConfiguration(); + + /** + * @return Configuration needed for a Task that should have + * Group Communication Service enabled + */ + Configuration getTaskConfiguration(Configuration partialTaskConf); + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommServiceDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommServiceDriver.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommServiceDriver.java new file mode 100644 index 0000000..a097075 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommServiceDriver.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.driver; + + +import org.apache.reef.annotations.Provided; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.driver.task.FailedTask; +import org.apache.reef.driver.task.RunningTask; +import org.apache.reef.io.network.group.impl.driver.GroupCommDriverImpl; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.wake.EStage; + +@Private +@Provided +@DefaultImplementation(value = GroupCommDriverImpl.class) +public interface GroupCommServiceDriver extends GroupCommDriver { + + /** + * Not user facing but used the Group Communication Service class + * + * @return The running task stage that will handle the RunningTask + * events + */ + EStage<RunningTask> getGroupCommRunningTaskStage(); + + /** + * Not user facing but used the Group Communication Service class + * + * @return The running task stage that will handle the FailedTask + * events + */ + EStage<FailedTask> getGroupCommFailedTaskStage(); + + /** + * Not user facing but used the Group Communication Service class + * + * @return The running task stage that will handle the FailedEvaluator + * events + */ + EStage<FailedEvaluator> getGroupCommFailedEvaluatorStage(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java new file mode 100644 index 0000000..d16d4ea --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.driver; + +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; + +/** + * A node in the Topology representing a Task on the driver + * Impl should maintain state relating to whether task is running/dead and + * status of neighboring nodes and send ctrl msgs to the tasks indicating + * topology changing events + */ +public interface TaskNode { + + String getTaskId(); + + int getVersion(); + + int getNumberOfChildren(); + + TaskNode getParent(); + + void setParent(TaskNode parent); + + void addChild(TaskNode child); + + void removeChild(TaskNode taskNode); + + boolean isRunning(); + + void onRunningTask(); + + void onFailedTask(); + + boolean hasChanges(); + + boolean isNeighborActive(String neighborId); + + void onReceiptOfAcknowledgement(GroupCommunicationMessage msg); + + void onParentRunning(); + + void onParentDead(); + + void onChildRunning(String childId); + + void onChildDead(String childId); + + /** + * Check if this node is ready for sending + * TopologySetup + */ + void checkAndSendTopologySetupMessage(); + + /** + * Check if the neighbor node with id source + * is ready for sending TopologySetup + * @param source + */ + void checkAndSendTopologySetupMessageFor(String source); + + /** + * reset topology setup ensures that update topology is not sent to someone + * who is already updating topology which is usually when they are just + * (re)starting + * + * @return + */ + boolean resetTopologySetupSent(); + + void waitForTopologySetupOrFailure(); + + void setSibling(TaskNode leaf); + + TaskNode successor(); + + void updatingTopology (); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNodeStatus.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNodeStatus.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNodeStatus.java new file mode 100644 index 0000000..93955e1 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNodeStatus.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.driver; + +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos.GroupCommMessage.Type; + +/** + * Tracks the Status of the ctrl msgs sent to a + * task node in the topology - + * what msgs have been sent to this node and + * what msgs have been ACKed as received by this node + * Status of neighbors + * This is used to see whether the local topology + * of a Task is completely set-up + * It also offers convenience methods for waiting + * on receiving ACKs from the task. + */ +public interface TaskNodeStatus { + + boolean hasChanges(); + + void onTopologySetupMessageSent(); + + boolean isActive(String neighborId); + + /** + * Process the msg that was received and update + * state accordingly + */ + void processAcknowledgement(GroupCommunicationMessage msg); + + /** + * To be called before sending a ctrl msg to the task + * represented by this node. All ctrl msgs sent to this + * node need to be ACKed. + * Ctrl msgs will be sent from a src & + * ACK sent from the task will be for a src. + * As this is called from the TaskNodeImpl use srcId of msg + * In TaskNodeImpl while processMsg use dstId of msg + */ + public void expectAckFor(final Type msgType, final String srcId); + + /** + * Used when the task has failed to clear all + * the state that is associated with this task + * Also should release the locks held for implementing + * the convenience wait* methods + */ + void clearStateAndReleaseLocks(); + + /** + * This should remove state concerning neighboring tasks + * that have failed + */ + void updateFailureOf(String taskId); + + void waitForTopologySetup(); + + /** + * Called to denote that a UpdateTopology msg will + * be sent + */ + void updatingTopology (); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/Topology.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/Topology.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/Topology.java new file mode 100644 index 0000000..b393d7f --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/Topology.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.driver; + +import org.apache.reef.io.network.group.api.config.OperatorSpec; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.tang.Configuration; + +/** + * A topology should implement the following + * interface so that it can work with the + * elastic group communication framework + * Currently we have two implementations + * 1. Flat 2. Tree + */ +public interface Topology { + + /** + * Get the version of the Task 'taskId' + * that belongs to this topology + * + * @param taskId + * @return + */ + int getNodeVersion(String taskId); + + /** + * Get the id of the root task + * + * @return + */ + String getRootId(); + + /** + * Set task with id 'senderId' as + * the root of this topology + * + * @param senderId + */ + void setRootTask(String senderId); + + /** + * Add task with id 'taskId' to + * the topology + * + * @param taskId + */ + void addTask(String taskId); + + /** + * Remove task with id 'taskId' from + * the topology + * + * @param taskId + */ + void removeTask(String taskId); + + /** + * Update state on receipt of RunningTask + * event for task with id 'id' + * + * @param id + */ + void onRunningTask(String id); + + /** + * Update state on receipt of FailedTask + * event for task with id 'id' + * + * @param id + */ + void onFailedTask(String id); + + /** + * Set operator specification of the operator + * that is the owner of this topology instance + * + * @param spec + */ + void setOperatorSpecification(OperatorSpec spec); + + /** + * Get the topology portion of the Configuration + * for the task 'taskId' that belongs to this + * topology + * + * @param taskId + * @return + */ + Configuration getTaskConfiguration(String taskId); + + /** + * Update state on receipt of a message + * from the tasks + * + * @param msg + */ + void onReceiptOfMessage(GroupCommunicationMessage msg); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AbstractGroupCommOperator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AbstractGroupCommOperator.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AbstractGroupCommOperator.java new file mode 100644 index 0000000..ac89338 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AbstractGroupCommOperator.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.operators; + +import org.apache.reef.tang.annotations.Name; + +public abstract class AbstractGroupCommOperator implements GroupCommOperator { + + @Override + public Class<? extends Name<String>> getOperName() { + throw new UnsupportedOperationException(); + } + + @Override + public Class<? extends Name<String>> getGroupName() { + throw new UnsupportedOperationException(); + } + + @Override + public void initialize() { + throw new UnsupportedOperationException(); + } + + @Override + public int getVersion() { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AllGather.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AllGather.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AllGather.java new file mode 100644 index 0000000..8c2d047 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AllGather.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.operators; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.wake.Identifier; + +import java.util.List; + +/** + * MPI AllGather Operator. + * <p/> + * Each task applies this operator on an element of type T. The result will be + * a list of elements constructed using the elements all-gathered at each + * task. + */ +public interface AllGather<T> extends GroupCommOperator { + + /** + * Apply the operation on element. + * + * @return List of all elements on which the operation was applied using default order + */ + List<T> apply(T element) throws NetworkException, + InterruptedException; + + /** + * Apply the operation on element. + * + * @return List of all elements on which the operation was applied using order specified + */ + List<T> apply(T element, List<? extends Identifier> order) + throws NetworkException, InterruptedException; +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AllReduce.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AllReduce.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AllReduce.java new file mode 100644 index 0000000..fd4bda5 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AllReduce.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.operators; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.wake.Identifier; + +import java.util.List; + +/** + * MPI All Reduce Operator. Each task applies this operator on an element of + * type T. The result will be an element which is result of applying a reduce + * function on the list of all elements on which this operator has been applied + */ +public interface AllReduce<T> extends GroupCommOperator { + + /** + * Apply the operation on element. + * + * @return result of all-reduce on all elements operation was applied on. + * Reduce function is applied based on default order. + */ + T apply(T aElement) throws InterruptedException, NetworkException; + + /** + * Apply the operation on element. + * + * @return result of all-reduce on all elements operation was applied on. + * Reduce function is applied based on specified order. + */ + T apply(T element, List<? extends Identifier> order) throws InterruptedException, NetworkException; + + /** + * Get the {@link org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction} configured. + * + * @return {@link org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction} + */ + Reduce.ReduceFunction<T> getReduceFunction(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Broadcast.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Broadcast.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Broadcast.java new file mode 100644 index 0000000..ae007ac --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Broadcast.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.operators; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.group.impl.operators.BroadcastReceiver; +import org.apache.reef.io.network.group.impl.operators.BroadcastSender; +import org.apache.reef.tang.annotations.DefaultImplementation; + +/** + * MPI Broadcast operator. + * <p/> + * The sender or root send's an element that is received by all the receivers or other tasks. + * <p/> + * This is an asymmetric operation and hence the differentiation b/w Sender and Receiver. + */ +public interface Broadcast { + + /** + * Sender or Root. + */ + @DefaultImplementation(BroadcastSender.class) + static interface Sender<T> extends GroupCommOperator { + + /** + * Send element to all receivers. + */ + void send(T element) throws NetworkException, InterruptedException; + } + + /** + * Receivers or Non-roots + */ + @DefaultImplementation(BroadcastReceiver.class) + static interface Receiver<T> extends GroupCommOperator { + + /** + * Receiver the element broadcasted by sender. + * + * @return the element broadcasted by sender + */ + T receive() throws NetworkException, InterruptedException; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java new file mode 100644 index 0000000..a04b02e --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.operators; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.wake.Identifier; + +import java.util.List; + +/** + * MPI Gather Operator. + * <p/> + * This is an operator where the root is a receiver and there are multiple senders. + * The root or receiver gathers all the elements sent by the senders in a List. + */ +public interface Gather { + + /** + * Senders or non-roots. + */ + static interface Sender<T> extends GroupCommOperator { + + /** + * Send the element to the root/receiver. + */ + void send(T element) throws InterruptedException, NetworkException; + } + + /** + * Receiver or Root + */ + static interface Receiver<T> extends GroupCommOperator { + + /** + * Receive the elements sent by the senders in default order. + * + * @return elements sent by senders as a List in default order + */ + List<T> receive() throws InterruptedException, NetworkException; + + /** + * Receive the elements sent by the senders in specified order + * + * @return elements sent by senders as a List in specified order + */ + List<T> receive(List<? extends Identifier> order) throws InterruptedException, NetworkException; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/GroupCommOperator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/GroupCommOperator.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/GroupCommOperator.java new file mode 100644 index 0000000..ae90d99 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/GroupCommOperator.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.operators; + +import org.apache.reef.io.network.exception.ParentDeadException; +import org.apache.reef.tang.annotations.Name; + +public interface GroupCommOperator { + + Class<? extends Name<String>> getOperName(); + + Class<? extends Name<String>> getGroupName(); + + void initialize() throws ParentDeadException; + + int getVersion(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Reduce.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Reduce.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Reduce.java new file mode 100644 index 0000000..b638348 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Reduce.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.api.operators; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.group.impl.operators.ReduceReceiver; +import org.apache.reef.io.network.group.impl.operators.ReduceSender; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.wake.Identifier; + +import java.util.List; + +/** + * MPI Reduce operator. + * <p/> + * This is another operator with root being receiver All senders send an element + * to the receiver. These elements are passed through a reduce function and its + * result is made available at the root + */ +public interface Reduce { + + /** + * Receiver or Root + */ + @DefaultImplementation(ReduceReceiver.class) + static interface Receiver<T> extends GroupCommOperator { + + /** + * Receive values sent by senders and pass them through the reduce + * function in default order. + * + * @return Result of applying reduce function on the elements gathered in default order. + */ + T reduce() throws InterruptedException, NetworkException; + + /** + * Receive values sent by senders and pass them through the reduce + * function in specified order. + * + * @return Result of applying reduce function on the elements gathered in specified order. + */ + T reduce(List<? extends Identifier> order) throws InterruptedException, NetworkException; + + /** + * The reduce function to be applied on the set of received values + * + * @return {@link ReduceFunction} + */ + Reduce.ReduceFunction<T> getReduceFunction(); + } + + /** + * Senders or non roots + */ + @DefaultImplementation(ReduceSender.class) + static interface Sender<T> extends GroupCommOperator { + + /** + * Send the element to the root. + */ + void send(T element) throws NetworkException, InterruptedException; + + /** + * The {@link ReduceFunction} to be applied on the set of received values. + * + * @return {@link ReduceFunction} + */ + Reduce.ReduceFunction<T> getReduceFunction(); + } + + /** + * Interface for a Reduce Function takes in an {@link Iterable} returns an + * aggregate value computed from the {@link Iterable} + */ + static interface ReduceFunction<T> { + /** + * Apply the function on elements. + * + * @return aggregate value computed from elements. + */ + T apply(Iterable<T> elements); + } +}
