GIRAPH-919: Add worker to worker communication (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/9e1a5a05 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/9e1a5a05 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/9e1a5a05 Branch: refs/heads/release-1.1 Commit: 9e1a5a05368a1543aaf6bea0176b9d79058e03b9 Parents: b218d72 Author: Maja Kabiljo <[email protected]> Authored: Tue Jun 24 09:51:40 2014 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Tue Jun 24 09:51:40 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../java/org/apache/giraph/comm/ServerData.java | 35 +++++++++ .../giraph/comm/requests/RequestType.java | 4 +- .../SendWorkerToWorkerMessageRequest.java | 80 ++++++++++++++++++++ .../apache/giraph/graph/GraphTaskManager.java | 1 + .../apache/giraph/master/BspServiceMaster.java | 11 ++- .../org/apache/giraph/worker/WorkerContext.java | 73 +++++++++++++++++- 7 files changed, 202 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/9e1a5a05/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index d315a9f..8da555d 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-919: Add worker to worker communication (majakabiljo) + GIRAPH-922: SimpleEdgeStore has a bug causing NPE (pavanka) GIRAPH-915: With BigDataIO some messages can get ignored (majakabiljo via pavanka) http://git-wip-us.apache.org/repos/asf/giraph/blob/9e1a5a05/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java index f0ecca2..b3f8733 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java @@ -36,6 +36,9 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; /** @@ -84,6 +87,13 @@ public class ServerData<I extends WritableComparable, /** Service worker */ private final CentralizedServiceWorker<I, V, E> serviceWorker; + /** Store for current messages from other workers to this worker */ + private volatile List<Writable> currentWorkerToWorkerMessages = + Collections.synchronizedList(new ArrayList<Writable>()); + /** Store for message from other workers to this worker for next superstep */ + private volatile List<Writable> incomingWorkerToWorkerMessages = + Collections.synchronizedList(new ArrayList<Writable>()); + /** * Constructor. * @@ -166,6 +176,10 @@ public class ServerData<I extends WritableComparable, messageStoreFactory.newStore(conf.getIncomingMessageValueFactory()); incomingMessageStore = messageStoreFactory.newStore(conf.getOutgoingMessageValueFactory()); + + currentWorkerToWorkerMessages = incomingWorkerToWorkerMessages; + incomingWorkerToWorkerMessages = + Collections.synchronizedList(new ArrayList<Writable>()); } /** @@ -204,4 +218,25 @@ public class ServerData<I extends WritableComparable, public CentralizedServiceWorker<I, V, E> getServiceWorker() { return this.serviceWorker; } + + /** + * Get and clear worker to worker messages for this superstep. Can be + * called only once per superstep. + * + * @return List of messages for this worker + */ + public List<Writable> getAndClearCurrentWorkerToWorkerMessages() { + List<Writable> ret = currentWorkerToWorkerMessages; + currentWorkerToWorkerMessages = null; + return ret; + } + + /** + * Add incoming message to this worker for next superstep. Thread-safe. + * + * @param message Message received + */ + public void addIncomingWorkerToWorkerMessage(Writable message) { + incomingWorkerToWorkerMessages.add(message); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/9e1a5a05/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java index 7fe2ae7..408295c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java @@ -58,7 +58,9 @@ public enum RequestType { /** Send aggregators from master to worker owners */ SEND_AGGREGATORS_TO_OWNER_REQUEST(SendAggregatorsToOwnerRequest.class), /** Send aggregators from worker owner to other workers */ - SEND_AGGREGATORS_TO_WORKER_REQUEST(SendAggregatorsToWorkerRequest.class); + SEND_AGGREGATORS_TO_WORKER_REQUEST(SendAggregatorsToWorkerRequest.class), + /** Send message from worker to worker */ + SEND_WORKER_TO_WORKER_MESSAGE_REQUEST(SendWorkerToWorkerMessageRequest.class); /** Class of request which this type corresponds to */ private final Class<? extends WritableRequest> requestClass; http://git-wip-us.apache.org/repos/asf/giraph/blob/9e1a5a05/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerToWorkerMessageRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerToWorkerMessageRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerToWorkerMessageRequest.java new file mode 100644 index 0000000..a2505ef --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerToWorkerMessageRequest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.requests; + +import org.apache.giraph.comm.ServerData; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** Request which sends any Writable message from one worker to another */ +public class SendWorkerToWorkerMessageRequest extends WritableRequest + implements WorkerRequest<WritableComparable, Writable, Writable> { + /** Message sent */ + private Writable message; + + /** + * Default constructor, for reflection + */ + public SendWorkerToWorkerMessageRequest() { + } + + /** + * Constructor with message + * + * @param message Message sent + */ + public SendWorkerToWorkerMessageRequest(Writable message) { + this.message = message; + } + + @Override + public RequestType getType() { + return RequestType.SEND_WORKER_TO_WORKER_MESSAGE_REQUEST; + } + + @Override + void writeRequest(DataOutput output) throws IOException { + Text.writeString(output, message.getClass().getName()); + message.write(output); + } + + @Override + void readFieldsRequest(DataInput input) throws IOException { + String className = Text.readString(input); + try { + message = (Writable) Class.forName(className).newInstance(); + message.readFields(input); + } catch (InstantiationException | IllegalAccessException | + ClassNotFoundException e) { + throw new IllegalStateException( + "readFieldsRequest: Exception occurred", e); + } + } + + @Override + public void doRequest( + ServerData<WritableComparable, Writable, Writable> serverData) { + serverData.addIncomingWorkerToWorkerMessage(message); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/9e1a5a05/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index ad5fc91..e13eedd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -432,6 +432,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, serviceWorker.prepareSuperstep(); serviceWorker.getWorkerContext().setGraphState(graphState); + serviceWorker.getWorkerContext().setupSuperstep(serviceWorker); GiraphTimerContext preSuperstepTimer = wcPreSuperstepTimer.time(); serviceWorker.getWorkerContext().preSuperstep(); preSuperstepTimer.stop(); http://git-wip-us.apache.org/repos/asf/giraph/blob/9e1a5a05/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index ad7e045..02d4f2b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -98,6 +98,8 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -175,7 +177,7 @@ public class BspServiceMaster<I extends WritableComparable, private MasterServer masterServer; /** Master info */ private MasterInfo masterInfo; - /** List of workers in current superstep */ + /** List of workers in current superstep, sorted by task id */ private List<WorkerInfo> chosenWorkerInfoList = Lists.newArrayList(); /** Limit locality information added to each InputSplit znode */ private final int localityLimit = 5; @@ -1555,6 +1557,13 @@ public class BspServiceMaster<I extends WritableComparable, setJobStateFailed("coordinateSuperstep: Not enough healthy workers for " + "superstep " + getSuperstep()); } else { + // Sort this list, so order stays the same over supersteps + Collections.sort(chosenWorkerInfoList, new Comparator<WorkerInfo>() { + @Override + public int compare(WorkerInfo wi1, WorkerInfo wi2) { + return Integer.compare(wi1.getTaskId(), wi2.getTaskId()); + } + }); for (WorkerInfo workerInfo : chosenWorkerInfoList) { String workerInfoHealthyPath = getWorkerInfoHealthyPath(getApplicationAttempt(), http://git-wip-us.apache.org/repos/asf/giraph/blob/9e1a5a05/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java index 17347db..29835c5 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java @@ -18,24 +18,36 @@ package org.apache.giraph.worker; +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.requests.SendWorkerToWorkerMessageRequest; import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; import org.apache.giraph.graph.GraphState; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Mapper; +import java.util.List; + /** * WorkerContext allows for the execution of user code * on a per-worker basis. There's one WorkerContext per worker. */ @SuppressWarnings("rawtypes") public abstract class WorkerContext - extends DefaultImmutableClassesGiraphConfigurable - implements WorkerAggregatorUsage { + extends DefaultImmutableClassesGiraphConfigurable + implements WorkerAggregatorUsage { + /** Global graph state */ private GraphState graphState; /** Worker aggregator usage */ private WorkerAggregatorUsage workerAggregatorUsage; + /** Service worker */ + private CentralizedServiceWorker serviceWorker; + /** Sorted list of other participating workers */ + private List<WorkerInfo> workerList; + /** Index of this worker within workerList */ + private int myWorkerIndex; + /** * Set the graph state. * @@ -46,6 +58,17 @@ public abstract class WorkerContext } /** + * Setup superstep. + * + * @param serviceWorker Service worker containing all the information + */ + public void setupSuperstep(CentralizedServiceWorker<?, ?, ?> serviceWorker) { + this.serviceWorker = serviceWorker; + workerList = serviceWorker.getWorkerInfoList(); + myWorkerIndex = workerList.indexOf(serviceWorker.getWorkerInfo()); + } + + /** * Set worker aggregator usage * * @param workerAggregatorUsage Worker aggregator usage @@ -81,6 +104,52 @@ public abstract class WorkerContext public abstract void preSuperstep(); /** + * Get number of workers + * + * @return Number of workers + */ + public int getWorkerCount() { + return workerList.size(); + } + + /** + * Get index for this worker + * + * @return Index of this worker + */ + public int getMyWorkerIndex() { + return myWorkerIndex; + } + + /** + * Get messages which other workers sent to this worker and clear them (can + * be called once per superstep) + * + * @return Messages received + */ + public List<Writable> getAndClearMessagesFromOtherWorkers() { + return serviceWorker.getServerData(). + getAndClearCurrentWorkerToWorkerMessages(); + } + + /** + * Send message to another worker + * + * @param message Message to send + * @param workerIndex Index of the worker to send the message to + */ + public void sendMessageToWorker(Writable message, int workerIndex) { + SendWorkerToWorkerMessageRequest request = + new SendWorkerToWorkerMessageRequest(message); + if (workerIndex == myWorkerIndex) { + request.doRequest(serviceWorker.getServerData()); + } else { + serviceWorker.getWorkerClient().sendWritableRequest( + workerList.get(workerIndex).getTaskId(), request); + } + } + + /** * Execute user code. * This method is executed once on each Worker after each * superstep ends.
