http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceReceiver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceReceiver.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceReceiver.java new file mode 100644 index 0000000..3a6791c --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceReceiver.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.operators; + +import org.apache.reef.driver.parameters.DriverIdentifier; +import org.apache.reef.driver.task.TaskConfigurationOptions; +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.exception.ParentDeadException; +import org.apache.reef.io.network.group.api.operators.Reduce; +import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction; +import org.apache.reef.io.network.impl.NetworkService; +import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler; +import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient; +import org.apache.reef.io.network.group.api.task.OperatorTopology; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.group.impl.config.parameters.*; +import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl; +import org.apache.reef.io.network.group.impl.utils.Utils; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.Identifier; + +import javax.inject.Inject; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +public class ReduceReceiver<T> implements Reduce.Receiver<T>, EventHandler<GroupCommunicationMessage> { + + private static final Logger LOG = Logger.getLogger(ReduceReceiver.class.getName()); + + private final Class<? extends Name<String>> groupName; + private final Class<? extends Name<String>> operName; + private final CommGroupNetworkHandler commGroupNetworkHandler; + private final Codec<T> dataCodec; + private final NetworkService<GroupCommunicationMessage> netService; + private final Sender sender; + private final ReduceFunction<T> reduceFunction; + + private final OperatorTopology topology; + + private final CommunicationGroupServiceClient commGroupClient; + + private final AtomicBoolean init = new AtomicBoolean(false); + + private final int version; + + @Inject + public ReduceReceiver(@Parameter(CommunicationGroupName.class) final String groupName, + @Parameter(OperatorName.class) final String operName, + @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId, + @Parameter(DataCodec.class) final Codec<T> dataCodec, + @Parameter(ReduceFunctionParam.class) final ReduceFunction<T> reduceFunction, + @Parameter(DriverIdentifier.class) final String driverId, + @Parameter(TaskVersion.class) final int version, + final CommGroupNetworkHandler commGroupNetworkHandler, + final NetworkService<GroupCommunicationMessage> netService, + final CommunicationGroupServiceClient commGroupClient) { + super(); + this.version = version; + LOG.finest(operName + " has CommGroupHandler-" + commGroupNetworkHandler.toString()); + this.groupName = Utils.getClass(groupName); + this.operName = Utils.getClass(operName); + this.dataCodec = dataCodec; + this.reduceFunction = reduceFunction; + this.commGroupNetworkHandler = commGroupNetworkHandler; + this.netService = netService; + this.sender = new Sender(this.netService); + this.topology = new OperatorTopologyImpl(this.groupName, this.operName, selfId, driverId, sender, version); + this.commGroupNetworkHandler.register(this.operName, this); + this.commGroupClient = commGroupClient; + } + + @Override + public int getVersion() { + return version; + } + + @Override + public void initialize() throws ParentDeadException { + topology.initialize(); + } + + @Override + public Class<? extends Name<String>> getOperName() { + return operName; + } + + @Override + public Class<? extends Name<String>> getGroupName() { + return groupName; + } + + @Override + public String toString() { + return "ReduceReceiver:" + Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + version; + } + + @Override + public void onNext(final GroupCommunicationMessage msg) { + topology.handle(msg); + } + + @Override + public T reduce() throws InterruptedException, NetworkException { + LOG.entering("ReduceReceiver", "reduce", this); + LOG.fine("I am " + this); + + if (init.compareAndSet(false, true)) { + commGroupClient.initialize(); + } + // I am root + LOG.fine(this + " Waiting to receive reduced value"); + // Wait for children to send + final T redVal; + try { + redVal = topology.recvFromChildren(reduceFunction, dataCodec); + } catch (final ParentDeadException e) { + throw new RuntimeException("ParentDeadException", e); + } + LOG.fine(this + " Received Reduced value: " + (redVal != null ? redVal : "NULL")); + LOG.exiting("ReduceReceiver", "reduce", Arrays.toString(new Object[]{redVal})); + return redVal; + } + + @Override + public T reduce(final List<? extends Identifier> order) throws InterruptedException, NetworkException { + throw new UnsupportedOperationException(); + } + + @Override + public ReduceFunction<T> getReduceFunction() { + return reduceFunction; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceSender.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceSender.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceSender.java new file mode 100644 index 0000000..505f072 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceSender.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.operators; + +import org.apache.reef.driver.parameters.DriverIdentifier; +import org.apache.reef.driver.task.TaskConfigurationOptions; +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.exception.ParentDeadException; +import org.apache.reef.io.network.group.api.operators.Reduce; +import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction; +import org.apache.reef.io.network.impl.NetworkService; +import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler; +import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient; +import org.apache.reef.io.network.group.api.task.OperatorTopology; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.group.impl.config.parameters.*; +import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl; +import org.apache.reef.io.network.group.impl.utils.Utils; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class ReduceSender<T> implements Reduce.Sender<T>, EventHandler<GroupCommunicationMessage> { + + private static final Logger LOG = Logger.getLogger(ReduceSender.class.getName()); + + private final Class<? extends Name<String>> groupName; + private final Class<? extends Name<String>> operName; + private final CommGroupNetworkHandler commGroupNetworkHandler; + private final Codec<T> dataCodec; + private final NetworkService<GroupCommunicationMessage> netService; + private final Sender sender; + private final ReduceFunction<T> reduceFunction; + + private final OperatorTopology topology; + + private final CommunicationGroupServiceClient commGroupClient; + + private final AtomicBoolean init = new AtomicBoolean(false); + + private final int version; + + @Inject + public ReduceSender( + final @Parameter(CommunicationGroupName.class) String groupName, + final @Parameter(OperatorName.class) String operName, + final @Parameter(TaskConfigurationOptions.Identifier.class) String selfId, + final @Parameter(DataCodec.class) Codec<T> dataCodec, + final @Parameter(ReduceFunctionParam.class) ReduceFunction<T> reduceFunction, + final @Parameter(DriverIdentifier.class) String driverId, + final @Parameter(TaskVersion.class) int version, + final CommGroupNetworkHandler commGroupNetworkHandler, + final NetworkService<GroupCommunicationMessage> netService, + final CommunicationGroupServiceClient commGroupClient) { + + super(); + + LOG.log(Level.FINEST, "{0} has CommGroupHandler-{1}", + new Object[]{operName, commGroupNetworkHandler}); + + this.version = version; + this.groupName = Utils.getClass(groupName); + this.operName = Utils.getClass(operName); + this.dataCodec = dataCodec; + this.reduceFunction = reduceFunction; + this.commGroupNetworkHandler = commGroupNetworkHandler; + this.netService = netService; + this.sender = new Sender(this.netService); + this.topology = new OperatorTopologyImpl(this.groupName, this.operName, selfId, driverId, sender, version); + this.commGroupNetworkHandler.register(this.operName, this); + this.commGroupClient = commGroupClient; + } + + @Override + public int getVersion() { + return version; + } + + @Override + public void initialize() throws ParentDeadException { + topology.initialize(); + } + + @Override + public Class<? extends Name<String>> getOperName() { + return operName; + } + + @Override + public Class<? extends Name<String>> getGroupName() { + return groupName; + } + + @Override + public String toString() { + return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + version; + } + + @Override + public void onNext(final GroupCommunicationMessage msg) { + topology.handle(msg); + } + + @Override + public void send(final T myData) throws NetworkException, InterruptedException { + LOG.entering("ReduceSender", "send", new Object[]{this, myData}); + LOG.fine("I am " + this); + + if (init.compareAndSet(false, true)) { + commGroupClient.initialize(); + } + // I am an intermediate node or leaf. + LOG.finest("Waiting for children"); + // Wait for children to send + try { + final T reducedValueOfChildren = topology.recvFromChildren(reduceFunction, dataCodec); + final List<T> vals = new ArrayList<>(2); + vals.add(myData); + if (reducedValueOfChildren != null) { + vals.add(reducedValueOfChildren); + } + final T reducedValue = reduceFunction.apply(vals); + LOG.fine(this + " Sending local " + reducedValue + " to parent"); + topology.sendToParent(dataCodec.encode(reducedValue), ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce); + } catch (final ParentDeadException e) { + throw new RuntimeException("ParentDeadException", e); + } + LOG.exiting("ReduceSender", "send", Arrays.toString(new Object[]{this, myData})); + } + + @Override + public ReduceFunction<T> getReduceFunction() { + return reduceFunction; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/Sender.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/Sender.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/Sender.java new file mode 100644 index 0000000..b124906 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/Sender.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.operators; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.Connection; +import org.apache.reef.io.network.group.api.operators.AbstractGroupCommOperator; +import org.apache.reef.io.network.impl.NetworkService; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.util.StringIdentifierFactory; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; + +import java.util.Arrays; +import java.util.logging.Logger; + +public class Sender extends AbstractGroupCommOperator { + + private static final Logger LOG = Logger.getLogger(Sender.class.getName()); + + private final NetworkService<GroupCommunicationMessage> netService; + private final IdentifierFactory idFac = new StringIdentifierFactory(); + + public Sender(final NetworkService<GroupCommunicationMessage> netService) { + this.netService = netService; + } + + public void send(final GroupCommunicationMessage msg) throws NetworkException { + LOG.entering("Sender", "send", msg); + final String dest = msg.getDestid(); + send(msg, dest); + LOG.exiting("Sender", "send", msg); + } + + public void send(final GroupCommunicationMessage msg, final String dest) throws NetworkException { + LOG.entering("Sender", "send", new Object[]{msg, dest}); + final Identifier destId = idFac.getNewInstance(dest); + final Connection<GroupCommunicationMessage> link = netService.newConnection(destId); + link.open(); + link.write(msg); + LOG.exiting("Sender", "send", Arrays.toString(new Object[]{msg, dest})); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/ChildNodeStruct.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/ChildNodeStruct.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/ChildNodeStruct.java new file mode 100644 index 0000000..b69ec5d --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/ChildNodeStruct.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.task; + +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; + +import java.util.logging.Logger; + +public class ChildNodeStruct extends NodeStructImpl { + + private static final Logger LOG = Logger.getLogger(ChildNodeStruct.class.getName()); + + public ChildNodeStruct(final String id, final int version) { + super(id, version); + } + + @Override + public boolean checkDead(final GroupCommunicationMessage gcm) { + LOG.entering("ChildNodeStruct", "checkDead", gcm); + final boolean retVal = gcm.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead ? true : false; + LOG.exiting("ChildNodeStruct", "checkDead", gcm); + return retVal; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java new file mode 100644 index 0000000..72af1ec --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.task; + +import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.group.impl.utils.Utils; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.logging.Logger; + +public class CommGroupNetworkHandlerImpl implements + CommGroupNetworkHandler { + + private static final Logger LOG = Logger.getLogger(CommGroupNetworkHandlerImpl.class.getName()); + + private final Map<Class<? extends Name<String>>, EventHandler<GroupCommunicationMessage>> operHandlers = new ConcurrentHashMap<>(); + private final Map<Class<? extends Name<String>>, BlockingQueue<GroupCommunicationMessage>> topologyNotifications = new ConcurrentHashMap<>(); + + @Inject + public CommGroupNetworkHandlerImpl() { + } + + @Override + public void register(final Class<? extends Name<String>> operName, + final EventHandler<GroupCommunicationMessage> operHandler) { + LOG.entering("CommGroupNetworkHandlerImpl", "register", new Object[]{Utils.simpleName(operName), operHandler}); + operHandlers.put(operName, operHandler); + LOG.exiting("CommGroupNetworkHandlerImpl", "register", Arrays.toString(new Object[]{Utils.simpleName(operName), operHandler})); + } + + @Override + public void addTopologyElement(final Class<? extends Name<String>> operName) { + LOG.entering("CommGroupNetworkHandlerImpl", "addTopologyElement", Utils.simpleName(operName)); + LOG.finest("Creating LBQ for " + operName); + topologyNotifications.put(operName, new LinkedBlockingQueue<GroupCommunicationMessage>()); + LOG.exiting("CommGroupNetworkHandlerImpl", "addTopologyElement", Utils.simpleName(operName)); + } + + @Override + public void onNext(final GroupCommunicationMessage msg) { + LOG.entering("CommGroupNetworkHandlerImpl", "onNext", msg); + final Class<? extends Name<String>> operName = Utils.getClass(msg.getOperatorname()); + if (msg.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated || msg.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges) { + topologyNotifications.get(operName).add(msg); + } else { + operHandlers.get(operName).onNext(msg); + } + LOG.exiting("CommGroupNetworkHandlerImpl", "onNext", msg); + } + + @Override + public byte[] waitForTopologyChanges(final Class<? extends Name<String>> operName) { + LOG.entering("CommGroupNetworkHandlerImpl", "waitForTopologyChanges", Utils.simpleName(operName)); + try { + final byte[] retVal = Utils.getData(topologyNotifications.get(operName).take()); + LOG.exiting("CommGroupNetworkHandlerImpl", "waitForTopologyChanges", retVal); + return retVal; + } catch (final InterruptedException e) { + throw new RuntimeException("InterruptedException while waiting for topology update of " + + operName.getSimpleName(), e); + } + } + + @Override + public GroupCommunicationMessage waitForTopologyUpdate(final Class<? extends Name<String>> operName) { + LOG.entering("CommGroupNetworkHandlerImpl", "waitForTopologyUpdate", Utils.simpleName(operName)); + try { + final GroupCommunicationMessage retVal = topologyNotifications.get(operName).take(); + LOG.exiting("CommGroupNetworkHandlerImpl", "waitForTopologyUpdate", retVal); + return retVal; + } catch (final InterruptedException e) { + throw new RuntimeException("InterruptedException while waiting for topology update of " + + operName.getSimpleName(), e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java new file mode 100644 index 0000000..d0e6e9e --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.task; + +import org.apache.reef.driver.parameters.DriverIdentifier; +import org.apache.reef.driver.task.TaskConfigurationOptions; +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.group.api.operators.Broadcast; +import org.apache.reef.io.network.group.api.operators.GroupCommOperator; +import org.apache.reef.io.network.group.api.operators.Reduce; +import org.apache.reef.io.network.impl.NetworkService; +import org.apache.reef.io.network.group.api.GroupChanges; +import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler; +import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient; +import org.apache.reef.io.network.group.api.task.GroupCommNetworkHandler; +import org.apache.reef.io.network.group.impl.GroupChangesCodec; +import org.apache.reef.io.network.group.impl.GroupChangesImpl; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName; +import org.apache.reef.io.network.group.impl.config.parameters.OperatorName; +import org.apache.reef.io.network.group.impl.config.parameters.SerializedOperConfigs; +import org.apache.reef.io.network.group.impl.operators.Sender; +import org.apache.reef.io.network.group.impl.utils.Utils; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.tang.formats.ConfigurationSerializer; +import org.apache.reef.wake.EStage; +import org.apache.reef.wake.impl.ThreadPoolStage; + +import javax.inject.Inject; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +public class CommunicationGroupClientImpl implements CommunicationGroupServiceClient { + private static final Logger LOG = Logger.getLogger(CommunicationGroupClientImpl.class.getName()); + + private final GroupCommNetworkHandler groupCommNetworkHandler; + private final Class<? extends Name<String>> groupName; + private final Map<Class<? extends Name<String>>, GroupCommOperator> operators; + private final Sender sender; + + private final String taskId; + + private final String driverId; + + private final CommGroupNetworkHandler commGroupNetworkHandler; + + private final AtomicBoolean init = new AtomicBoolean(false); + + @Inject + public CommunicationGroupClientImpl(@Parameter(CommunicationGroupName.class) final String groupName, + @Parameter(TaskConfigurationOptions.Identifier.class) final String taskId, + @Parameter(DriverIdentifier.class) final String driverId, + final GroupCommNetworkHandler groupCommNetworkHandler, + @Parameter(SerializedOperConfigs.class) final Set<String> operatorConfigs, + final ConfigurationSerializer configSerializer, + final NetworkService<GroupCommunicationMessage> netService) { + this.taskId = taskId; + this.driverId = driverId; + LOG.finest(groupName + " has GroupCommHandler-" + groupCommNetworkHandler.toString()); + this.groupName = Utils.getClass(groupName); + this.groupCommNetworkHandler = groupCommNetworkHandler; + this.sender = new Sender(netService); + this.operators = new TreeMap<>(new Comparator<Class<? extends Name<String>>>() { + + @Override + public int compare(final Class<? extends Name<String>> o1, final Class<? extends Name<String>> o2) { + final String s1 = o1.getSimpleName(); + final String s2 = o2.getSimpleName(); + return s1.compareTo(s2); + } + }); + try { + this.commGroupNetworkHandler = Tang.Factory.getTang().newInjector().getInstance(CommGroupNetworkHandler.class); + this.groupCommNetworkHandler.register(this.groupName, commGroupNetworkHandler); + + for (final String operatorConfigStr : operatorConfigs) { + + final Configuration operatorConfig = configSerializer.fromString(operatorConfigStr); + final Injector injector = Tang.Factory.getTang().newInjector(operatorConfig); + + injector.bindVolatileParameter(TaskConfigurationOptions.Identifier.class, taskId); + injector.bindVolatileParameter(CommunicationGroupName.class, groupName); + injector.bindVolatileInstance(CommGroupNetworkHandler.class, commGroupNetworkHandler); + injector.bindVolatileInstance(NetworkService.class, netService); + injector.bindVolatileInstance(CommunicationGroupServiceClient.class, this); + + final GroupCommOperator operator = injector.getInstance(GroupCommOperator.class); + final String operName = injector.getNamedInstance(OperatorName.class); + this.operators.put(Utils.getClass(operName), operator); + LOG.finest(operName + " has CommGroupHandler-" + commGroupNetworkHandler.toString()); + } + } catch (final InjectionException | IOException e) { + throw new RuntimeException("Unable to deserialize operator config", e); + } + } + + @Override + public Broadcast.Sender getBroadcastSender(final Class<? extends Name<String>> operatorName) { + LOG.entering("CommunicationGroupClientImpl", "getBroadcastSender", new Object[]{getQualifiedName(), + Utils.simpleName(operatorName)}); + final GroupCommOperator op = operators.get(operatorName); + if (!(op instanceof Broadcast.Sender)) { + throw new RuntimeException("Configured operator is not a broadcast sender"); + } + commGroupNetworkHandler.addTopologyElement(operatorName); + LOG.exiting("CommunicationGroupClientImpl", "getBroadcastSender", getQualifiedName() + op); + return (Broadcast.Sender) op; + } + + @Override + public Reduce.Receiver getReduceReceiver(final Class<? extends Name<String>> operatorName) { + LOG.entering("CommunicationGroupClientImpl", "getReduceReceiver", new Object[]{getQualifiedName(), + Utils.simpleName(operatorName)}); + final GroupCommOperator op = operators.get(operatorName); + if (!(op instanceof Reduce.Receiver)) { + throw new RuntimeException("Configured operator is not a reduce receiver"); + } + commGroupNetworkHandler.addTopologyElement(operatorName); + LOG.exiting("CommunicationGroupClientImpl", "getReduceReceiver", getQualifiedName() + op); + return (Reduce.Receiver) op; + } + + @Override + public Broadcast.Receiver getBroadcastReceiver(final Class<? extends Name<String>> operatorName) { + LOG.entering("CommunicationGroupClientImpl", "getBroadcastReceiver", new Object[]{getQualifiedName(), + Utils.simpleName(operatorName)}); + final GroupCommOperator op = operators.get(operatorName); + if (!(op instanceof Broadcast.Receiver)) { + throw new RuntimeException("Configured operator is not a broadcast receiver"); + } + commGroupNetworkHandler.addTopologyElement(operatorName); + LOG.exiting("CommunicationGroupClientImpl", "getBroadcastReceiver", getQualifiedName() + op); + return (Broadcast.Receiver) op; + } + + @Override + public Reduce.Sender getReduceSender(final Class<? extends Name<String>> operatorName) { + LOG.entering("CommunicationGroupClientImpl", "getReduceSender", new Object[]{getQualifiedName(), + Utils.simpleName(operatorName)}); + final GroupCommOperator op = operators.get(operatorName); + if (!(op instanceof Reduce.Sender)) { + throw new RuntimeException("Configured operator is not a reduce sender"); + } + commGroupNetworkHandler.addTopologyElement(operatorName); + LOG.exiting("CommunicationGroupClientImpl", "getReduceSender", getQualifiedName() + op); + return (Reduce.Sender) op; + } + + @Override + public void initialize() { + LOG.entering("CommunicationGroupClientImpl", "initialize", getQualifiedName()); + if (init.compareAndSet(false, true)) { + LOG.finest("CommGroup-" + groupName + " is initializing"); + final CountDownLatch initLatch = new CountDownLatch(operators.size()); + + final InitHandler initHandler = new InitHandler(initLatch); + final EStage<GroupCommOperator> initStage = new ThreadPoolStage<>(initHandler, operators.size()); + for (final GroupCommOperator op : operators.values()) { + initStage.onNext(op); + } + + try { + initLatch.await(); + } catch (final InterruptedException e) { + throw new RuntimeException("InterruptedException while waiting for initialization", e); + } + + if (initHandler.getException() != null) { + throw new RuntimeException(getQualifiedName() + "Parent dead. Current behavior is for the child to die too."); + } + } + LOG.exiting("CommunicationGroupClientImpl", "initialize", getQualifiedName()); + } + + @Override + public GroupChanges getTopologyChanges() { + LOG.entering("CommunicationGroupClientImpl", "getTopologyChanges", getQualifiedName()); + for (final GroupCommOperator op : operators.values()) { + final Class<? extends Name<String>> operName = op.getOperName(); + LOG.finest("Sending TopologyChanges msg to driver"); + try { + sender.send(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, taskId, op.getVersion(), driverId, + 0, Utils.EmptyByteArr)); + } catch (final NetworkException e) { + throw new RuntimeException("NetworkException while sending GetTopologyChanges", e); + } + } + final Codec<GroupChanges> changesCodec = new GroupChangesCodec(); + final Map<Class<? extends Name<String>>, GroupChanges> perOpChanges = new HashMap<>(); + for (final GroupCommOperator op : operators.values()) { + final Class<? extends Name<String>> operName = op.getOperName(); + final byte[] changes = commGroupNetworkHandler.waitForTopologyChanges(operName); + perOpChanges.put(operName, changesCodec.decode(changes)); + } + final GroupChanges retVal = mergeGroupChanges(perOpChanges); + LOG.exiting("CommunicationGroupClientImpl", "getTopologyChanges", getQualifiedName() + retVal); + return retVal; + } + + /** + * @param perOpChanges + * @return + */ + private GroupChanges mergeGroupChanges(final Map<Class<? extends Name<String>>, GroupChanges> perOpChanges) { + LOG.entering("CommunicationGroupClientImpl", "mergeGroupChanges", new Object[]{getQualifiedName(), perOpChanges}); + boolean doChangesExist = false; + for (final GroupChanges change : perOpChanges.values()) { + if (change.exist()) { + doChangesExist = true; + break; + } + } + final GroupChanges changes = new GroupChangesImpl(doChangesExist); + LOG.exiting("CommunicationGroupClientImpl", "mergeGroupChanges", getQualifiedName() + changes); + return changes; + } + + @Override + public void updateTopology() { + LOG.entering("CommunicationGroupClientImpl", "updateTopology", getQualifiedName()); + for (final GroupCommOperator op : operators.values()) { + final Class<? extends Name<String>> operName = op.getOperName(); + try { + sender.send(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, taskId, op.getVersion(), driverId, + 0, Utils.EmptyByteArr)); + } catch (final NetworkException e) { + throw new RuntimeException("NetworkException while sending UpdateTopology", e); + } + } + for (final GroupCommOperator op : operators.values()) { + final Class<? extends Name<String>> operName = op.getOperName(); + GroupCommunicationMessage msg; + do { + msg = commGroupNetworkHandler.waitForTopologyUpdate(operName); + } while (!isMsgVersionOk(msg)); + } + LOG.exiting("CommunicationGroupClientImpl", "updateTopology", getQualifiedName()); + } + + private boolean isMsgVersionOk(final GroupCommunicationMessage msg) { + LOG.entering("CommunicationGroupClientImpl", "isMsgVersionOk", new Object[]{getQualifiedName(), msg}); + if (msg.hasVersion()) { + final int msgVersion = msg.getVersion(); + final GroupCommOperator operator = operators.get(Utils.getClass(msg.getOperatorname())); + final int nodeVersion = operator.getVersion(); + final boolean retVal; + if (msgVersion < nodeVersion) { + LOG.warning(getQualifiedName() + "Received a ver-" + msgVersion + " msg while expecting ver-" + nodeVersion + + ". Discarding msg"); + retVal = false; + } else { + retVal = true; + } + LOG.exiting("CommunicationGroupClientImpl", "isMsgVersionOk", Arrays.toString(new Object[]{retVal, getQualifiedName(), msg})); + return retVal; + } else { + throw new RuntimeException(getQualifiedName() + "can only deal with versioned msgs"); + } + } + + private String getQualifiedName() { + return Utils.simpleName(groupName) + " "; + } + + @Override + public Class<? extends Name<String>> getName() { + return groupName; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java new file mode 100644 index 0000000..f8f6db4 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.task; + +import org.apache.reef.driver.task.TaskConfigurationOptions; +import org.apache.reef.io.network.impl.NetworkService; +import org.apache.reef.io.network.group.api.task.CommunicationGroupClient; +import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient; +import org.apache.reef.io.network.group.api.task.GroupCommClient; +import org.apache.reef.io.network.group.api.task.GroupCommNetworkHandler; +import org.apache.reef.io.network.group.impl.config.parameters.SerializedGroupConfigs; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.tang.formats.ConfigurationSerializer; + +import javax.inject.Inject; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class GroupCommClientImpl implements GroupCommClient { + private static final Logger LOG = Logger.getLogger(GroupCommClientImpl.class.getName()); + + private final Map<Class<? extends Name<String>>, CommunicationGroupServiceClient> communicationGroups = new HashMap<>(); + + @Inject + public GroupCommClientImpl( + final @Parameter(SerializedGroupConfigs.class) Set<String> groupConfigs, + final @Parameter(TaskConfigurationOptions.Identifier.class) String taskId, + final GroupCommNetworkHandler groupCommNetworkHandler, + final NetworkService<ReefNetworkGroupCommProtos.GroupCommMessage> netService, + final ConfigurationSerializer configSerializer) { + + LOG.log(Level.FINEST, "GroupCommHandler-{0}", groupCommNetworkHandler); + + for (final String groupConfigStr : groupConfigs) { + try { + final Configuration groupConfig = configSerializer.fromString(groupConfigStr); + + final Injector injector = Tang.Factory.getTang().newInjector(groupConfig); + injector.bindVolatileParameter(TaskConfigurationOptions.Identifier.class, taskId); + injector.bindVolatileInstance(GroupCommNetworkHandler.class, groupCommNetworkHandler); + injector.bindVolatileInstance(NetworkService.class, netService); + + final CommunicationGroupServiceClient commGroupClient = + injector.getInstance(CommunicationGroupServiceClient.class); + + this.communicationGroups.put(commGroupClient.getName(), commGroupClient); + + } catch (final InjectionException | IOException e) { + throw new RuntimeException("Unable to deserialize operator config", e); + } + } + } + + @Override + public CommunicationGroupClient getCommunicationGroup( + final Class<? extends Name<String>> groupName) { + return communicationGroups.get(groupName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java new file mode 100644 index 0000000..a1e9277 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.task; + +import org.apache.reef.io.network.Message; +import org.apache.reef.io.network.group.api.task.GroupCommNetworkHandler; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Logger; + +public class GroupCommNetworkHandlerImpl implements GroupCommNetworkHandler { + + private static final Logger LOG = Logger.getLogger(GroupCommNetworkHandlerImpl.class.getName()); + + private final Map<Class<? extends Name<String>>, EventHandler<GroupCommunicationMessage>> commGroupHandlers = new ConcurrentHashMap<>(); + + @Inject + public GroupCommNetworkHandlerImpl() { + } + + @Override + public void onNext(final Message<GroupCommunicationMessage> mesg) { + LOG.entering("GroupCommNetworkHandlerImpl", "onNext", mesg); + final Iterator<GroupCommunicationMessage> iter = mesg.getData().iterator(); + final GroupCommunicationMessage msg = iter.hasNext() ? iter.next() : null; + try { + final Class<? extends Name<String>> groupName = (Class<? extends Name<String>>) Class.forName(msg.getGroupname()); + commGroupHandlers.get(groupName).onNext(msg); + } catch (final ClassNotFoundException e) { + throw new RuntimeException("GroupName not found", e); + } + LOG.exiting("GroupCommNetworkHandlerImpl", "onNext", mesg); + } + + @Override + public void register(final Class<? extends Name<String>> groupName, + final EventHandler<GroupCommunicationMessage> commGroupNetworkHandler) { + LOG.entering("GroupCommNetworkHandlerImpl", "register", new Object[]{groupName, + commGroupNetworkHandler}); + commGroupHandlers.put(groupName, commGroupNetworkHandler); + LOG.exiting("GroupCommNetworkHandlerImpl", "register", Arrays.toString(new Object[]{groupName, + commGroupNetworkHandler})); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/InitHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/InitHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/InitHandler.java new file mode 100644 index 0000000..13bd644 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/InitHandler.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.task; + +import org.apache.reef.io.network.exception.ParentDeadException; +import org.apache.reef.io.network.group.api.operators.GroupCommOperator; +import org.apache.reef.wake.EventHandler; + +import java.util.concurrent.CountDownLatch; +import java.util.logging.Logger; + +class InitHandler implements EventHandler<GroupCommOperator> { + + private static final Logger LOG = Logger.getLogger(InitHandler.class.getName()); + + private ParentDeadException exception = null; + private final CountDownLatch initLatch; + + public InitHandler(final CountDownLatch initLatch) { + this.initLatch = initLatch; + } + + @Override + public void onNext(final GroupCommOperator op) { + LOG.entering("InitHandler", "onNext", op); + try { + op.initialize(); + } catch (final ParentDeadException e) { + this.exception = e; + } + initLatch.countDown(); + LOG.exiting("InitHandler", "onNext", op); + } + + public ParentDeadException getException() { + return exception; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/NodeStructImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/NodeStructImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/NodeStructImpl.java new file mode 100644 index 0000000..5976352 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/NodeStructImpl.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.task; + +import org.apache.reef.io.network.group.api.task.NodeStruct; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.group.impl.utils.Utils; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.logging.Logger; + +public abstract class NodeStructImpl implements NodeStruct { + + private static final Logger LOG = Logger.getLogger(NodeStructImpl.class.getName()); + + private final String id; + private final BlockingQueue<GroupCommunicationMessage> dataQue = new LinkedBlockingQueue<>(); + + private int version; + + public NodeStructImpl(final String id, final int version) { + super(); + this.id = id; + this.version = version; + } + + @Override + public int getVersion() { + return version; + } + + @Override + public void setVersion(final int version) { + this.version = version; + } + + @Override + public String getId() { + return id; + } + + @Override + public void addData(final GroupCommunicationMessage msg) { + LOG.entering("NodeStructImpl", "addData", msg); + dataQue.add(msg); + LOG.exiting("NodeStructImpl", "addData", msg); + } + + @Override + public byte[] getData() { + LOG.entering("NodeStructImpl", "getData"); + GroupCommunicationMessage gcm; + try { + gcm = dataQue.take(); + } catch (final InterruptedException e) { + throw new RuntimeException("InterruptedException while waiting for data from " + id, e); + } + + final byte[] retVal = checkDead(gcm) ? null : Utils.getData(gcm); + LOG.exiting("NodeStructImpl", "getData", retVal); + return retVal; + } + + @Override + public String toString() { + return "(" + id + "," + version + ")"; + } + + @Override + public boolean equals(final Object obj) { + if (obj instanceof NodeStructImpl) { + final NodeStructImpl that = (NodeStructImpl) obj; + return this.id.equals(that.id) && this.version == that.version; + } else { + return false; + } + } + + public abstract boolean checkDead(final GroupCommunicationMessage gcm); + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java new file mode 100644 index 0000000..f46031b --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java @@ -0,0 +1,466 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.task; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.exception.ParentDeadException; +import org.apache.reef.io.network.group.api.operators.Reduce; +import org.apache.reef.io.network.group.api.task.OperatorTopology; +import org.apache.reef.io.network.group.api.task.OperatorTopologyStruct; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.group.impl.operators.Sender; +import org.apache.reef.io.network.group.impl.utils.ResettingCountDownLatch; +import org.apache.reef.io.network.group.impl.utils.Utils; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.wake.EStage; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.impl.SingleThreadStage; + +import javax.inject.Inject; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +public class OperatorTopologyImpl implements OperatorTopology { + + private static final Logger LOG = Logger.getLogger(OperatorTopologyImpl.class.getName()); + + private final Class<? extends Name<String>> groupName; + private final Class<? extends Name<String>> operName; + private final String selfId; + private final String driverId; + private final Sender sender; + private final Object topologyLock = new Object(); + + private final int version; + + private final BlockingQueue<GroupCommunicationMessage> deltas = new LinkedBlockingQueue<>(); + private final BlockingQueue<GroupCommunicationMessage> deletionDeltas = new LinkedBlockingQueue<>(); + + private OperatorTopologyStruct baseTopology; + private OperatorTopologyStruct effectiveTopology; + private final ResettingCountDownLatch topologyLockAquired = new ResettingCountDownLatch(1); + private final AtomicBoolean updatingTopo = new AtomicBoolean(false); + + private final EventHandler<GroupCommunicationMessage> baseTopologyUpdateHandler = new BaseTopologyUpdateHandler(); + + private final EStage<GroupCommunicationMessage> baseTopologyUpdateStage = new SingleThreadStage<>( + "BaseTopologyUpdateStage", + baseTopologyUpdateHandler, + 5); + + private final EventHandler<GroupCommunicationMessage> dataHandlingStageHandler = new DataHandlingStageHandler(); + + // The queue capacity might determine how many tasks can be handled + private final EStage<GroupCommunicationMessage> dataHandlingStage = new SingleThreadStage<>("DataHandlingStage", + dataHandlingStageHandler, + 10000); + + @Inject + public OperatorTopologyImpl(final Class<? extends Name<String>> groupName, + final Class<? extends Name<String>> operName, final String selfId, + final String driverId, final Sender sender, final int version) { + super(); + this.groupName = groupName; + this.operName = operName; + this.selfId = selfId; + this.driverId = driverId; + this.sender = sender; + this.version = version; + } + + /** + * Handle messages meant for this operator. Data msgs are passed on + * to the DataHandlingStage while Ctrl msgs are queued up for the + * base topology to update later. Ctrl msgs signalling death of a + * task are also routed to the effectiveTopology in order to notify + * a waiting operation. During initialization when effective topology + * is not yet set-up, these *Dead msgs are queued in deletionDeltas + * for the small time window when these arrive after baseTopology has + * received TopologySetup but not yet created the effectiveTopology. + * Most times the msgs in the deletionDeltas will be discarded as stale + * msgs + * <p/> + * No synchronization is needed while handling *Dead messages. + * There 2 states: UpdatingTopo & NotUpdatingTopo + * If UpdatingTopo, deltas.put still takes care of adding this msg to effTop through baseTopo changes. + * If not, we add to effTopo. So we are good. + * <p/> + * However, for data msgs synchronization is needed. Look at doc of + * DataHandlingStage + * <p/> + * Adding to deletionDeltas should be outside + * effTopo!=null block. There is a rare possibility that during initialization + * just after baseTopo is created(so deltas will be ignored) & just before + * effTopo is created(so effTopo will be null) where we can miss a deletion + * msg if not added to deletionDelta because this method is synchronized + */ + @Override + public void handle(final GroupCommunicationMessage msg) { + LOG.entering("OperatorTopologyImpl", "handle", new Object[]{getQualifiedName(), msg}); + if (isMsgVersionOk(msg)) { + try { + switch (msg.getType()) { + case UpdateTopology: + updatingTopo.set(true); + baseTopologyUpdateStage.onNext(msg); + topologyLockAquired.awaitAndReset(1); + LOG.finest(getQualifiedName() + "topoLockAquired CDL released. Resetting it to new CDL"); + sendAckToDriver(msg); + break; + + case TopologySetup: + LOG.finest(getQualifiedName() + "Adding to deltas queue"); + deltas.put(msg); + break; + + case ParentAdd: + case ChildAdd: + LOG.finest(getQualifiedName() + "Adding to deltas queue"); + deltas.put(msg); + break; + + case ParentDead: + case ChildDead: + LOG.finest(getQualifiedName() + "Adding to deltas queue"); + deltas.put(msg); + + LOG.finest(getQualifiedName() + "Adding to deletionDeltas queue"); + deletionDeltas.put(msg); + + if (effectiveTopology != null) { + LOG.finest(getQualifiedName() + "Adding as data msg to non-null effective topology struct"); + effectiveTopology.addAsData(msg); + } else { + LOG.fine(getQualifiedName() + "Received a death message before effective topology was setup. CAUTION"); + } + break; + + default: + dataHandlingStage.onNext(msg); + } + } catch (final InterruptedException e) { + throw new RuntimeException("InterruptedException while trying to put ctrl msg into delta queue", e); + } + } + LOG.exiting("OperatorTopologyImpl", "handle", Arrays.toString(new Object[]{getQualifiedName(), msg})); + } + + private boolean isMsgVersionOk(final GroupCommunicationMessage msg) { + LOG.entering("OperatorTopologyImpl", "isMsgVersionOk", new Object[]{getQualifiedName(), msg}); + if (msg.hasVersion()) { + final int msgVersion = msg.getVersion(); + final boolean retVal; + if (msgVersion < version) { + LOG.warning(getQualifiedName() + "Received a ver-" + msgVersion + " msg while expecting ver-" + version + + ". Discarding msg"); + retVal = false; + } else { + retVal = true; + } + LOG.exiting("OperatorTopologyImpl", "isMsgVersionOk", Arrays.toString(new Object[]{retVal, getQualifiedName(), msg})); + return retVal; + } else { + throw new RuntimeException(getQualifiedName() + "can only deal with versioned msgs"); + } + } + + @Override + public void initialize() throws ParentDeadException { + LOG.entering("OperatorTopologyImpl", "initialize", getQualifiedName()); + createBaseTopology(); + LOG.exiting("OperatorTopologyImpl", "initialize", getQualifiedName()); + } + + @Override + public void sendToParent(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) throws ParentDeadException { + LOG.entering("OperatorTopologyImpl", "sendToParent", new Object[]{getQualifiedName(), data, msgType}); + refreshEffectiveTopology(); + assert (effectiveTopology != null); + effectiveTopology.sendToParent(data, msgType); + LOG.exiting("OperatorTopologyImpl", "sendToParent", Arrays.toString(new Object[]{getQualifiedName(), data, msgType})); + } + + @Override + public void sendToChildren(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) throws ParentDeadException { + LOG.entering("OperatorTopologyImpl", "sendToChildren", new Object[]{getQualifiedName(), data, msgType}); + refreshEffectiveTopology(); + assert (effectiveTopology != null); + effectiveTopology.sendToChildren(data, msgType); + LOG.exiting("OperatorTopologyImpl", "sendToChildren", Arrays.toString(new Object[]{getQualifiedName(), data, msgType})); + } + + @Override + public byte[] recvFromParent() throws ParentDeadException { + LOG.entering("OperatorTopologyImpl", "recvFromParent", getQualifiedName()); + refreshEffectiveTopology(); + assert (effectiveTopology != null); + final byte[] retVal = effectiveTopology.recvFromParent(); + LOG.exiting("OperatorTopologyImpl", "recvFromParent", Arrays.toString(new Object[]{getQualifiedName(), retVal})); + return retVal; + } + + @Override + public <T> T recvFromChildren(final Reduce.ReduceFunction<T> redFunc, final Codec<T> dataCodec) throws ParentDeadException { + LOG.entering("OperatorTopologyImpl", "recvFromChildren", getQualifiedName()); + refreshEffectiveTopology(); + assert (effectiveTopology != null); + final T retVal = effectiveTopology.recvFromChildren(redFunc, dataCodec); + LOG.exiting("OperatorTopologyImpl", "recvFromChildren", Arrays.toString(new Object[]{getQualifiedName(), retVal})); + return retVal; + } + + /** + * Only refreshes the effective topology with deletion msgs from + * deletionDeltas queue + * + * @throws ParentDeadException + */ + private void refreshEffectiveTopology() throws ParentDeadException { + LOG.entering("OperatorTopologyImpl", "refreshEffectiveTopology", getQualifiedName()); + LOG.finest(getQualifiedName() + "Waiting to acquire topoLock"); + synchronized (topologyLock) { + LOG.finest(getQualifiedName() + "Acquired topoLock"); + + assert (effectiveTopology != null); + + final Set<GroupCommunicationMessage> deletionDeltas = new HashSet<>(); + copyDeletionDeltas(deletionDeltas); + + LOG.finest(getQualifiedName() + "Updating effective topology struct with deletion msgs"); + effectiveTopology.update(deletionDeltas); + LOG.finest(getQualifiedName() + "Released topoLock"); + } + LOG.exiting("OperatorTopologyImpl", "refreshEffectiveTopology", getQualifiedName()); + } + + /** + * @throws ParentDeadException + */ + private void createBaseTopology() throws ParentDeadException { + LOG.entering("OperatorTopologyImpl", "createBaseTopology", getQualifiedName()); + baseTopology = new OperatorTopologyStructImpl(groupName, operName, selfId, driverId, sender, version); + updateBaseTopology(); + LOG.exiting("OperatorTopologyImpl", "createBaseTopology", getQualifiedName()); + } + + /** + * Blocking method that waits till the base topology is updated Unblocks when + * we receive a TopologySetup msg from driver + * <p/> + * Will also update the effective topology when the base topology is updated + * so that creation of effective topology is limited to just this method and + * refresh will only refresh the effective topology with deletion msgs from + * deletionDeltas queue + * + * @throws ParentDeadException + */ + private void updateBaseTopology() throws ParentDeadException { + LOG.entering("OperatorTopologyImpl", "updateBaseTopology", getQualifiedName()); + LOG.finest(getQualifiedName() + "Waiting to acquire topoLock"); + synchronized (topologyLock) { + LOG.finest(getQualifiedName() + "Acquired topoLock"); + try { + assert (baseTopology != null); + LOG.finest(getQualifiedName() + "Updating base topology. So setting dirty bit"); + baseTopology.setChanges(true); + + LOG.finest(getQualifiedName() + "Waiting for ctrl msgs"); + for (GroupCommunicationMessage msg = deltas.take(); msg.getType() != ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup; msg = deltas.take()) { + LOG.finest(getQualifiedName() + "Got " + msg.getType() + " msg from " + msg.getSrcid()); + if (effectiveTopology == null && msg.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead) { + /** + * If effectiveTopology!=null, this method is being called from the BaseTopologyUpdateStage + * And exception thrown will be caught by uncaughtExceptionHandler leading to System.exit + */ + LOG.finer(getQualifiedName() + "Throwing ParentDeadException"); + throw new ParentDeadException(getQualifiedName() + + "Parent dead. Current behavior is for the child to die too."); + } else { + LOG.finest(getQualifiedName() + "Updating basetopology struct"); + baseTopology.update(msg); + sendAckToDriver(msg); + } + LOG.finest(getQualifiedName() + "Waiting for ctrl msgs"); + } + + updateEffTopologyFromBaseTopology(); + + } catch (final InterruptedException e) { + throw new RuntimeException("InterruptedException while waiting for delta msg from driver", e); + } + LOG.finest(getQualifiedName() + "Released topoLock"); + } + LOG.exiting("OperatorTopologyImpl", "updateBaseTopology", getQualifiedName()); + } + + private void sendAckToDriver(final GroupCommunicationMessage msg) { + LOG.entering("OperatorTopologyImpl", "sendAckToDriver", new Object[]{getQualifiedName(), msg}); + try { + final String srcId = msg.getSrcid(); + if (msg.hasVersion()) { + final int srcVersion = msg.getSrcVersion(); + switch (msg.getType()) { + case UpdateTopology: + sender.send(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, selfId, this.version, driverId, + srcVersion, Utils.EmptyByteArr)); + break; + case ParentAdd: + sender.send(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdded, selfId, this.version, srcId, + srcVersion, Utils.EmptyByteArr), driverId); + break; + case ParentDead: + sender.send(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentRemoved, selfId, this.version, srcId, + srcVersion, Utils.EmptyByteArr), driverId); + break; + case ChildAdd: + sender.send(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdded, selfId, this.version, srcId, + srcVersion, Utils.EmptyByteArr), driverId); + break; + case ChildDead: + sender.send(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildRemoved, selfId, this.version, srcId, + srcVersion, Utils.EmptyByteArr), driverId); + break; + default: + throw new RuntimeException("Received a non control message for acknowledgement"); + } + } else { + throw new RuntimeException(getQualifiedName() + "Ack Sender can only deal with versioned msgs"); + } + } catch (final NetworkException e) { + throw new RuntimeException("NetworkException while sending ack to driver for delta msg " + msg.getType(), e); + } + LOG.exiting("OperatorTopologyImpl", "sendAckToDriver", Arrays.toString(new Object[]{getQualifiedName(), msg})); + } + + private void updateEffTopologyFromBaseTopology() { + LOG.entering("OperatorTopologyImpl", "updateEffTopologyFromBaseTopology", getQualifiedName()); + assert (baseTopology != null); + LOG.finest(getQualifiedName() + "Updaing effective topology"); + if (baseTopology.hasChanges()) { + //Create effectiveTopology from baseTopology + effectiveTopology = new OperatorTopologyStructImpl(baseTopology); + baseTopology.setChanges(false); + } + LOG.exiting("OperatorTopologyImpl", "updateEffTopologyFromBaseTopology", getQualifiedName()); + } + + /** + * @param deletionDeltasForUpdate + * @throws ParentDeadException + */ + private void copyDeletionDeltas(final Set<GroupCommunicationMessage> deletionDeltasForUpdate) + throws ParentDeadException { + LOG.entering("OperatorTopologyImpl", "copyDeletionDeltas", new Object[]{getQualifiedName(), + deletionDeltasForUpdate}); + this.deletionDeltas.drainTo(deletionDeltasForUpdate); + for (final GroupCommunicationMessage msg : deletionDeltasForUpdate) { + final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType = msg.getType(); + if (msgType == ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead) { + throw new ParentDeadException(getQualifiedName() + "Parent dead. Current behavior is for the child to die too."); + } + } + LOG.exiting("OperatorTopologyImpl", "copyDeletionDeltas", Arrays.toString(new Object[]{getQualifiedName(), + deletionDeltasForUpdate})); + } + + private String getQualifiedName() { + return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + selfId + ":ver(" + version + ") - "; + } + + /** + * Unlike Dead msgs this needs to be synchronized because data msgs are not + * routed through the base topo changes So we need to make sure to wait for + * updateTopo to complete and for the new effective topo to take effect. Hence + * updatinTopo is set to false in refreshEffTopo. Also, since this is called + * from a netty IO thread, we need to create a stage to move the msgs from + * netty space to application space and release the netty threads. Otherwise + * weird deadlocks can happen Ex: Sent model to k nodes using broadcast. Send + * to K+1 th is waiting for ACK. The K nodes already compute their states and + * reduce send their results. If we haven't finished refreshEffTopo because of + * which updatingTopo is true, we can't add the new msgs if the #netty threads + * is k All k threads are waiting to add data. Single user thread that is + * waiting for ACK does not come around to refreshEffTopo and we are + * deadlocked because there aren't enough netty threads to dispatch msgs to + * the application. Hence the stage + */ + private final class DataHandlingStageHandler implements EventHandler<GroupCommunicationMessage> { + @Override + public void onNext(final GroupCommunicationMessage dataMsg) { + LOG.entering("OperatorTopologyImpl.DataHandlingStageHandler", "onNext", new Object[]{getQualifiedName(), + dataMsg}); + LOG.finest(getQualifiedName() + "Waiting to acquire topoLock"); + synchronized (topologyLock) { + LOG.finest(getQualifiedName() + "Aqcuired topoLock"); + while (updatingTopo.get()) { + try { + LOG.finest(getQualifiedName() + "Topology is being updated. Released topoLock, Waiting on topoLock"); + topologyLock.wait(); + LOG.finest(getQualifiedName() + "Aqcuired topoLock"); + } catch (final InterruptedException e) { + throw new RuntimeException("InterruptedException while data handling" + + "stage was waiting for updatingTopo to become false", e); + } + } + if (effectiveTopology != null) { + LOG.finest(getQualifiedName() + "Non-null effectiveTopo.addAsData(msg)"); + effectiveTopology.addAsData(dataMsg); + } else { + LOG.fine("Received a data message before effective topology was setup"); + } + LOG.finest(getQualifiedName() + "Released topoLock"); + } + LOG.exiting("OperatorTopologyImpl.DataHandlingStageHandler", "onNext", + Arrays.toString(new Object[]{getQualifiedName(), dataMsg})); + } + } + + private final class BaseTopologyUpdateHandler implements EventHandler<GroupCommunicationMessage> { + @Override + public void onNext(final GroupCommunicationMessage msg) { + assert (msg.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology); + assert (effectiveTopology != null); + LOG.entering("OperatorTopologyImpl.BaseTopologyUpdateHandler", "onNext", new Object[]{getQualifiedName(), msg}); + LOG.finest(getQualifiedName() + "Waiting to acquire topoLock"); + synchronized (topologyLock) { + LOG.finest(getQualifiedName() + "Acquired topoLock"); + LOG.finest(getQualifiedName() + "Releasing topoLoackAcquired CDL"); + topologyLockAquired.countDown(); + try { + updateBaseTopology(); + LOG.finest(getQualifiedName() + "Completed updating base & effective topologies"); + } catch (final ParentDeadException e) { + throw new RuntimeException(getQualifiedName() + "BaseTopologyUpdateStage: Unexpected ParentDeadException", e); + } + updatingTopo.set(false); + LOG.finest(getQualifiedName() + "Topology update complete. Notifying waiting threads"); + topologyLock.notifyAll(); + LOG.finest(getQualifiedName() + "Released topoLock"); + } + LOG.exiting("OperatorTopologyImpl.BaseTopologyUpdateHandler", "onNext", + Arrays.toString(new Object[]{getQualifiedName(), msg})); + } + } +}
