http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java new file mode 100644 index 0000000..d3eb5da --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.master.input; + +import org.apache.giraph.worker.WorkerInfo; +import org.apache.hadoop.mapreduce.InputSplit; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Input splits organizer for vertex and edge input splits on master, which + * uses locality information + */ +public class LocalityAwareInputSplitsMasterOrganizer + implements InputSplitsMasterOrganizer { + /** All splits before this pointer were taken */ + private final AtomicInteger listPointer = new AtomicInteger(); + /** List of serialized splits */ + private final List<byte[]> serializedSplits; + /** Array containing information about whether a split was taken or not */ + private final AtomicBoolean[] splitsTaken; + + /** Map with preferred splits for each worker */ + private final Map<Integer, ConcurrentLinkedQueue<Integer>> + workerToPreferredSplitsMap; + + + /** + * Constructor + * + * @param serializedSplits Serialized splits + * @param splits Splits + * @param workers List of workers + */ + public LocalityAwareInputSplitsMasterOrganizer(List<byte[]> serializedSplits, + List<InputSplit> splits, List<WorkerInfo> workers) { + this.serializedSplits = serializedSplits; + splitsTaken = new AtomicBoolean[serializedSplits.size()]; + // Mark all splits as not taken initially + for (int i = 0; i < serializedSplits.size(); i++) { + splitsTaken[i] = new AtomicBoolean(false); + } + + workerToPreferredSplitsMap = new HashMap<>(); + for (WorkerInfo worker : workers) { + workerToPreferredSplitsMap.put(worker.getTaskId(), + new ConcurrentLinkedQueue<Integer>()); + } + // Go through all splits + for (int i = 0; i < splits.size(); i++) { + try { + String[] locations = splits.get(i).getLocations(); + // For every worker + for (WorkerInfo worker : workers) { + // Check splits locations + for (String location : locations) { + // If split is local for the worker, add it to preferred list + if (location.contains(worker.getHostname())) { + workerToPreferredSplitsMap.get(worker.getTaskId()).add(i); + break; + } + } + } + } catch (IOException | InterruptedException e) { + throw new IllegalStateException( + "Exception occurred while getting splits locations", e); + } + } + } + + @Override + public byte[] getSerializedSplitFor(int workerTaskId) { + ConcurrentLinkedQueue<Integer> preferredSplits = + workerToPreferredSplitsMap.get(workerTaskId); + // Try to find a local split + while (true) { + // Get position to check + Integer splitIndex = preferredSplits.poll(); + // Check if all local splits were already processed for this worker + if (splitIndex == null) { + break; + } + // Try to reserve the split + if (splitsTaken[splitIndex].compareAndSet(false, true)) { + return serializedSplits.get(splitIndex); + } + } + + // No more local splits available, proceed linearly from splits list + while (true) { + // Get position to check + int splitIndex = listPointer.getAndIncrement(); + // Check if all splits were already taken + if (splitIndex >= serializedSplits.size()) { + return null; + } + // Try to reserve the split + if (splitsTaken[splitIndex].compareAndSet(false, true)) { + return serializedSplits.get(splitIndex); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java new file mode 100644 index 0000000..8399c8a --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.master.input; + +import org.apache.giraph.worker.WorkerInfo; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Organizer for mapping splits on master. Mapping splits need all to be + * given to all workers, unlike vertex and edge splits which are read by + * exactly one worker each + */ +public class MappingInputSplitsMasterOrganizer + implements InputSplitsMasterOrganizer { + /** List of splits */ + private final List<byte[]> splits; + /** Map from worker task id to atomic pointer in splits list */ + private final Map<Integer, AtomicInteger> + workerTaskIdToNextSplitIndexMap; + + /** + * Constructor + * + * @param serializedSplits Splits + * @param workers List of workers + */ + public MappingInputSplitsMasterOrganizer(List<byte[]> serializedSplits, + List<WorkerInfo> workers) { + this.splits = serializedSplits; + workerTaskIdToNextSplitIndexMap = new HashMap<>(); + for (WorkerInfo worker : workers) { + workerTaskIdToNextSplitIndexMap.put( + worker.getTaskId(), new AtomicInteger(0)); + } + } + + @Override + public byte[] getSerializedSplitFor(int workerTaskId) { + AtomicInteger nextSplitIndex = + workerTaskIdToNextSplitIndexMap.get(workerTaskId); + int splitIndex = nextSplitIndex.getAndIncrement(); + return splitIndex < splits.size() ? splits.get(splitIndex) : null; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java new file mode 100644 index 0000000..327e59d --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.master.input; + +import org.apache.giraph.comm.MasterClient; +import org.apache.giraph.comm.requests.ReplyWithInputSplitRequest; +import org.apache.giraph.io.GiraphInputFormat; +import org.apache.giraph.io.InputType; +import org.apache.giraph.worker.WorkerInfo; +import org.apache.hadoop.mapreduce.InputSplit; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +/** + * Handler for input splits on master + * + * Since currently Giraph fails if worker fails while reading input, we + * didn't complicate this part with retries yet, later it could be added by + * keeping track of which worker got which split and then if worker dies put + * these splits back to queues. + */ +public class MasterInputSplitsHandler { + /** Whether to use locality information */ + private final boolean useLocality; + /** Master client */ + private MasterClient masterClient; + /** Master client */ + private List<WorkerInfo> workers; + /** Map of splits organizers for each split type */ + private Map<InputType, InputSplitsMasterOrganizer> splitsMap = + new EnumMap<>(InputType.class); + /** Latches to say when one input splits type is ready to be accessed */ + private Map<InputType, CountDownLatch> latchesMap = + new EnumMap<>(InputType.class); + + /** + * Constructor + * + * @param useLocality Whether to use locality information or not + */ + public MasterInputSplitsHandler(boolean useLocality) { + this.useLocality = useLocality; + for (InputType inputType : InputType.values()) { + latchesMap.put(inputType, new CountDownLatch(1)); + } + } + + /** + * Initialize + * + * @param masterClient Master client + * @param workers List of workers + */ + public void initialize(MasterClient masterClient, List<WorkerInfo> workers) { + this.masterClient = masterClient; + this.workers = workers; + } + + /** + * Add splits + * + * @param splitsType Type of splits + * @param inputSplits Splits + * @param inputFormat Format + */ + public void addSplits(InputType splitsType, List<InputSplit> inputSplits, + GiraphInputFormat inputFormat) { + List<byte[]> serializedSplits = new ArrayList<>(); + for (InputSplit inputSplit : inputSplits) { + try { + ByteArrayOutputStream byteArrayOutputStream = + new ByteArrayOutputStream(); + DataOutput outputStream = + new DataOutputStream(byteArrayOutputStream); + inputFormat.writeInputSplit(inputSplit, outputStream); + serializedSplits.add(byteArrayOutputStream.toByteArray()); + } catch (IOException e) { + throw new IllegalStateException("IOException occurred", e); + } + } + InputSplitsMasterOrganizer inputSplitsOrganizer; + if (splitsType == InputType.MAPPING) { + inputSplitsOrganizer = new MappingInputSplitsMasterOrganizer( + serializedSplits, workers); + } else { + inputSplitsOrganizer = useLocality ? + new LocalityAwareInputSplitsMasterOrganizer(serializedSplits, + inputSplits, workers) : + new BasicInputSplitsMasterOrganizer(serializedSplits); + } + splitsMap.put(splitsType, inputSplitsOrganizer); + latchesMap.get(splitsType).countDown(); + } + + /** + * Called after we receive a split request from some worker, should send + * split back to it if available, or send it information that there is no + * more available + * + * @param splitType Type of split requested + * @param workerTaskId Id of worker who requested split + */ + public void sendSplitTo(InputType splitType, int workerTaskId) { + try { + // Make sure we don't try to retrieve splits before they were added + latchesMap.get(splitType).await(); + } catch (InterruptedException e) { + throw new IllegalStateException("Interrupted", e); + } + byte[] serializedInputSplit = + splitsMap.get(splitType).getSerializedSplitFor(workerTaskId); + masterClient.sendWritableRequest(workerTaskId, + new ReplyWithInputSplitRequest(splitType, + serializedInputSplit == null ? new byte[0] : serializedInputSplit)); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java b/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java new file mode 100644 index 0000000..992b6fe --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Input related master classes + */ +package org.apache.giraph.master.input; http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java index 4600745..6914c3b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java @@ -115,7 +115,7 @@ public class PartitionUtils { workerStatsMap.put( workerInfo, new VertexEdgeCount(partitionStats.getVertexCount(), - partitionStats.getEdgeCount())); + partitionStats.getEdgeCount(), 0)); } else { workerStatsMap.put( workerInfo, http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 5b754d6..1031bb3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -59,8 +59,6 @@ import org.apache.giraph.graph.AddressesAndPartitionsWritable; import org.apache.giraph.graph.FinishedSuperstepStats; import org.apache.giraph.graph.GlobalStats; import org.apache.giraph.graph.GraphTaskManager; -import org.apache.giraph.graph.InputSplitEvents; -import org.apache.giraph.graph.InputSplitPaths; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexEdgeCount; import org.apache.giraph.io.EdgeOutputFormat; @@ -177,10 +175,8 @@ public class BspServiceWorker<I extends WritableComparable, /** Time spent waiting on requests to finish */ private GiraphTimer waitRequestsTimer; - /** InputSplit handlers used in INPUT_SUPERSTEP for vertex splits */ - private InputSplitsHandler vertexSplitsHandler; - /** InputSplit handlers used in INPUT_SUPERSTEP for edge splits */ - private InputSplitsHandler edgeSplitsHandler; + /** InputSplit handlers used in INPUT_SUPERSTEP */ + private WorkerInputSplitsHandler inputSplitsHandler; /** * Constructor for setting up the worker. @@ -237,8 +233,9 @@ public class BspServiceWorker<I extends WritableComparable, null; GiraphMetrics.get().addSuperstepResetObserver(this); - vertexSplitsHandler = null; - edgeSplitsHandler = null; + + inputSplitsHandler = new WorkerInputSplitsHandler( + workerInfo, masterInfo.getTaskId(), workerClient); } @Override @@ -295,26 +292,20 @@ public class BspServiceWorker<I extends WritableComparable, * * Use one or more threads to do the loading. * - * @param inputSplitPathList List of input split paths * @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s * @return Statistics of the vertices and edges loaded * @throws InterruptedException * @throws KeeperException */ private VertexEdgeCount loadInputSplits( - List<String> inputSplitPathList, CallableFactory<VertexEdgeCount> inputSplitsCallableFactory) throws KeeperException, InterruptedException { VertexEdgeCount vertexEdgeCount = new VertexEdgeCount(); - // Determine how many threads to use based on the number of input splits - int maxInputSplitThreads = (inputSplitPathList.size() - 1) / - getConfiguration().getMaxWorkers() + 1; - int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(), - maxInputSplitThreads); + int numThreads = getConfiguration().getNumInputSplitsThreads(); if (LOG.isInfoEnabled()) { LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " + "originally " + getConfiguration().getNumInputSplitsThreads() + - " threads(s) for " + inputSplitPathList.size() + " total splits."); + " threads(s)"); } List<VertexEdgeCount> results = @@ -336,46 +327,21 @@ public class BspServiceWorker<I extends WritableComparable, */ private long loadMapping() throws KeeperException, InterruptedException { - List<String> inputSplitPathList = - getZkExt().getChildrenExt(mappingInputSplitsPaths.getPath(), - false, false, true); - - InputSplitPathOrganizer splitOrganizer = - new InputSplitPathOrganizer(getZkExt(), - inputSplitPathList, getWorkerInfo().getHostname(), - getConfiguration().useInputSplitLocality()); - MappingInputSplitsCallableFactory<I, V, E, ? extends Writable> - mappingInputSplitsCallableFactory = + inputSplitsCallableFactory = new MappingInputSplitsCallableFactory<>( getConfiguration().createWrappedMappingInputFormat(), - splitOrganizer, getContext(), getConfiguration(), this, - getZkExt()); + inputSplitsHandler); - long entriesLoaded = 0; - // Determine how many threads to use based on the number of input splits - int maxInputSplitThreads = inputSplitPathList.size(); - int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(), - maxInputSplitThreads); - if (LOG.isInfoEnabled()) { - LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " + - "originally " + getConfiguration().getNumInputSplitsThreads() + - " threads(s) for " + inputSplitPathList.size() + " total splits."); - } + long mappingsLoaded = + loadInputSplits(inputSplitsCallableFactory).getMappingCount(); - List<Integer> results = - ProgressableUtils.getResultsWithNCallables( - mappingInputSplitsCallableFactory, - numThreads, "load-mapping-%d", getContext()); - for (Integer result : results) { - entriesLoaded += result; - } // after all threads finish loading - call postFilling localData.getMappingStore().postFilling(); - return entriesLoaded; + return mappingsLoaded; } /** @@ -386,31 +352,15 @@ public class BspServiceWorker<I extends WritableComparable, */ private VertexEdgeCount loadVertices() throws KeeperException, InterruptedException { - List<String> inputSplitPathList = - getZkExt().getChildrenExt(vertexInputSplitsPaths.getPath(), - false, false, true); - - InputSplitPathOrganizer splitOrganizer = - new InputSplitPathOrganizer(getZkExt(), - inputSplitPathList, getWorkerInfo().getHostname(), - getConfiguration().useInputSplitLocality()); - vertexSplitsHandler = new InputSplitsHandler( - splitOrganizer, - getZkExt(), - getContext(), - BspService.VERTEX_INPUT_SPLIT_RESERVED_NODE, - BspService.VERTEX_INPUT_SPLIT_FINISHED_NODE); - VertexInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory = new VertexInputSplitsCallableFactory<I, V, E>( getConfiguration().createWrappedVertexInputFormat(), getContext(), getConfiguration(), this, - vertexSplitsHandler, - getZkExt()); + inputSplitsHandler); - return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory); + return loadInputSplits(inputSplitsCallableFactory); } /** @@ -420,32 +370,15 @@ public class BspServiceWorker<I extends WritableComparable, * @return Number of edges loaded */ private long loadEdges() throws KeeperException, InterruptedException { - List<String> inputSplitPathList = - getZkExt().getChildrenExt(edgeInputSplitsPaths.getPath(), - false, false, true); - - InputSplitPathOrganizer splitOrganizer = - new InputSplitPathOrganizer(getZkExt(), - inputSplitPathList, getWorkerInfo().getHostname(), - getConfiguration().useInputSplitLocality()); - edgeSplitsHandler = new InputSplitsHandler( - splitOrganizer, - getZkExt(), - getContext(), - BspService.EDGE_INPUT_SPLIT_RESERVED_NODE, - BspService.EDGE_INPUT_SPLIT_FINISHED_NODE); - EdgeInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory = new EdgeInputSplitsCallableFactory<I, V, E>( getConfiguration().createWrappedEdgeInputFormat(), getContext(), getConfiguration(), this, - edgeSplitsHandler, - getZkExt()); + inputSplitsHandler); - return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory). - getEdgeCount(); + return loadInputSplits(inputSplitsCallableFactory).getEdgeCount(); } @Override @@ -459,46 +392,12 @@ public class BspServiceWorker<I extends WritableComparable, } /** - * Ensure the input splits are ready for processing - * - * @param inputSplitPaths Input split paths - * @param inputSplitEvents Input split events - */ - private void ensureInputSplitsReady(InputSplitPaths inputSplitPaths, - InputSplitEvents inputSplitEvents) { - while (true) { - Stat inputSplitsReadyStat; - try { - inputSplitsReadyStat = getZkExt().exists( - inputSplitPaths.getAllReadyPath(), true); - } catch (KeeperException e) { - throw new IllegalStateException("ensureInputSplitsReady: " + - "KeeperException waiting on input splits", e); - } catch (InterruptedException e) { - throw new IllegalStateException("ensureInputSplitsReady: " + - "InterruptedException waiting on input splits", e); - } - if (inputSplitsReadyStat != null) { - break; - } - inputSplitEvents.getAllReadyChanged().waitForever(); - inputSplitEvents.getAllReadyChanged().reset(); - } - } - - /** * Mark current worker as done and then wait for all workers * to finish processing input splits. - * - * @param inputSplitPaths Input split paths - * @param inputSplitEvents Input split events */ - private void markCurrentWorkerDoneThenWaitForOthers( - InputSplitPaths inputSplitPaths, - InputSplitEvents inputSplitEvents) { + private void markCurrentWorkerDoneReadingThenWaitForOthers() { String workerInputSplitsDonePath = - inputSplitPaths.getDonePath() + "/" + - getWorkerInfo().getHostnameId(); + inputSplitsWorkerDonePath + "/" + getWorkerInfo().getHostnameId(); try { getZkExt().createExt(workerInputSplitsDonePath, null, @@ -508,32 +407,31 @@ public class BspServiceWorker<I extends WritableComparable, } catch (KeeperException e) { throw new IllegalStateException( "markCurrentWorkerDoneThenWaitForOthers: " + - "KeeperException creating worker done splits", e); + "KeeperException creating worker done splits", e); } catch (InterruptedException e) { throw new IllegalStateException( "markCurrentWorkerDoneThenWaitForOthers: " + - "InterruptedException creating worker done splits", e); + "InterruptedException creating worker done splits", e); } while (true) { Stat inputSplitsDoneStat; try { inputSplitsDoneStat = - getZkExt().exists(inputSplitPaths.getAllDonePath(), - true); + getZkExt().exists(inputSplitsAllDonePath, true); } catch (KeeperException e) { throw new IllegalStateException( "markCurrentWorkerDoneThenWaitForOthers: " + - "KeeperException waiting on worker done splits", e); + "KeeperException waiting on worker done splits", e); } catch (InterruptedException e) { throw new IllegalStateException( "markCurrentWorkerDoneThenWaitForOthers: " + - "InterruptedException waiting on worker done splits", e); + "InterruptedException waiting on worker done splits", e); } if (inputSplitsDoneStat != null) { break; } - inputSplitEvents.getAllDoneChanged().waitForever(); - inputSplitEvents.getAllDoneChanged().reset(); + getInputSplitsAllDoneEvent().waitForever(); + getInputSplitsAllDoneEvent().reset(); } } @@ -597,8 +495,6 @@ else[HADOOP_NON_SECURE]*/ long entriesLoaded; if (getConfiguration().hasMappingInputFormat()) { - // Ensure the mapping InputSplits are ready for processing - ensureInputSplitsReady(mappingInputSplitsPaths, mappingInputSplitsEvents); getContext().progress(); try { entriesLoaded = loadMapping(); @@ -618,17 +514,12 @@ else[HADOOP_NON_SECURE]*/ entriesLoaded + " entries from inputSplits"); } - // Workers wait for each other to finish, coordinated by master - markCurrentWorkerDoneThenWaitForOthers(mappingInputSplitsPaths, - mappingInputSplitsEvents); // Print stats for data stored in localData once mapping is fully // loaded on all the workers localData.printStats(); } if (getConfiguration().hasVertexInputFormat()) { - // Ensure the vertex InputSplits are ready for processing - ensureInputSplitsReady(vertexInputSplitsPaths, vertexInputSplitsEvents); getContext().progress(); try { vertexEdgeCount = loadVertices(); @@ -646,8 +537,6 @@ else[HADOOP_NON_SECURE]*/ WorkerProgress.get().finishLoadingVertices(); if (getConfiguration().hasEdgeInputFormat()) { - // Ensure the edge InputSplits are ready for processing - ensureInputSplitsReady(edgeInputSplitsPaths, edgeInputSplitsEvents); getContext().progress(); try { vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges()); @@ -666,17 +555,7 @@ else[HADOOP_NON_SECURE]*/ LOG.info("setup: Finally loaded a total of " + vertexEdgeCount); } - if (getConfiguration().hasVertexInputFormat()) { - // Workers wait for each other to finish, coordinated by master - markCurrentWorkerDoneThenWaitForOthers(vertexInputSplitsPaths, - vertexInputSplitsEvents); - } - - if (getConfiguration().hasEdgeInputFormat()) { - // Workers wait for each other to finish, coordinated by master - markCurrentWorkerDoneThenWaitForOthers(edgeInputSplitsPaths, - edgeInputSplitsEvents); - } + markCurrentWorkerDoneReadingThenWaitForOthers(); // Create remaining partitions owned by this worker. for (PartitionOwner partitionOwner : masterSetPartitionOwners) { @@ -898,13 +777,6 @@ else[HADOOP_NON_SECURE]*/ if (getSuperstep() != INPUT_SUPERSTEP) { postSuperstepCallbacks(); - } else { - if (getConfiguration().hasVertexInputFormat()) { - vertexSplitsHandler.setDoneReadingGraph(true); - } - if (getConfiguration().hasEdgeInputFormat()) { - edgeSplitsHandler.setDoneReadingGraph(true); - } } globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor); @@ -1692,7 +1564,7 @@ else[HADOOP_NON_SECURE]*/ workerClient.setup(getConfiguration().authenticate()); /*end[HADOOP_NON_SECURE]*/ return new VertexEdgeCount(globalStats.getVertexCount(), - globalStats.getEdgeCount()); + globalStats.getEdgeCount(), 0); } catch (IOException e) { throw new RuntimeException( @@ -1963,4 +1835,9 @@ else[HADOOP_NON_SECURE]*/ } return count; } + + @Override + public WorkerInputSplitsHandler getInputSplitsHandler() { + return inputSplitsHandler; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java index 89f74b3..b7f1eb6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java @@ -26,9 +26,9 @@ import org.apache.giraph.graph.VertexEdgeCount; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.EdgeReader; import org.apache.giraph.io.filters.EdgeInputFilter; +import org.apache.giraph.io.InputType; import org.apache.giraph.utils.LoggerUtils; import org.apache.giraph.utils.MemoryUtils; -import org.apache.giraph.zk.ZooKeeperExt; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; @@ -89,17 +89,14 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, * @param configuration Configuration * @param bspServiceWorker service worker * @param splitsHandler Handler for input splits - * @param zooKeeperExt Handle to ZooKeeperExt */ public EdgeInputSplitsCallable( EdgeInputFormat<I, E> edgeInputFormat, Mapper<?, ?, ?, ?>.Context context, ImmutableClassesGiraphConfiguration<I, V, E> configuration, BspServiceWorker<I, V, E> bspServiceWorker, - InputSplitsHandler splitsHandler, - ZooKeeperExt zooKeeperExt) { - super(context, configuration, bspServiceWorker, splitsHandler, - zooKeeperExt); + WorkerInputSplitsHandler splitsHandler) { + super(context, configuration, bspServiceWorker, splitsHandler); this.edgeInputFormat = edgeInputFormat; this.bspServiceWorker = bspServiceWorker; @@ -126,6 +123,11 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, return edgeInputFormat; } + @Override + public InputType getInputType() { + return InputType.EDGE; + } + /** * Read edges from input split. If testing, the user may request a * maximum number of edges to be read from an input split. @@ -226,6 +228,6 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD); WorkerProgress.get().incrementEdgeInputSplitsLoaded(); - return new VertexEdgeCount(0, inputSplitEdgesLoaded); + return new VertexEdgeCount(0, inputSplitEdgesLoaded, 0); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java index f68ac93..d4bc1fc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java @@ -22,7 +22,6 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.VertexEdgeCount; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.utils.CallableFactory; -import org.apache.giraph.zk.ZooKeeperExt; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; @@ -46,9 +45,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable, /** {@link BspServiceWorker} we're running on. */ private final BspServiceWorker<I, V, E> bspServiceWorker; /** Handler for input splits */ - private final InputSplitsHandler splitsHandler; - /** {@link ZooKeeperExt} for this worker. */ - private final ZooKeeperExt zooKeeperExt; + private final WorkerInputSplitsHandler splitsHandler; /** * Constructor. @@ -58,20 +55,17 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable, * @param configuration Configuration * @param bspServiceWorker Calling {@link BspServiceWorker} * @param splitsHandler Handler for input splits - * @param zooKeeperExt {@link ZooKeeperExt} for this worker */ public EdgeInputSplitsCallableFactory( EdgeInputFormat<I, E> edgeInputFormat, Mapper<?, ?, ?, ?>.Context context, ImmutableClassesGiraphConfiguration<I, V, E> configuration, BspServiceWorker<I, V, E> bspServiceWorker, - InputSplitsHandler splitsHandler, - ZooKeeperExt zooKeeperExt) { + WorkerInputSplitsHandler splitsHandler) { this.edgeInputFormat = edgeInputFormat; this.context = context; this.configuration = configuration; this.bspServiceWorker = bspServiceWorker; - this.zooKeeperExt = zooKeeperExt; this.splitsHandler = splitsHandler; } @@ -82,7 +76,6 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable, context, configuration, bspServiceWorker, - splitsHandler, - zooKeeperExt); + splitsHandler); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java deleted file mode 100644 index 4e93ce0..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.worker; - -import com.google.common.collect.Lists; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.io.GiraphInputFormat; -import org.apache.giraph.time.SystemTime; -import org.apache.giraph.time.Time; -import org.apache.giraph.time.Times; -import org.apache.giraph.zk.ZooKeeperExt; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.log4j.Logger; -import org.apache.zookeeper.KeeperException; - -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * InputSplitCallable to read all the splits - * - * @param <I> vertexId type - * @param <V> vertexValue type - * @param <E> edgeValue type - */ -public abstract class FullInputSplitCallable<I extends WritableComparable, - V extends Writable, E extends Writable> - implements Callable<Integer> { - /** Class logger */ - private static final Logger LOG = Logger.getLogger( - FullInputSplitCallable.class); - /** Class time object */ - private static final Time TIME = SystemTime.get(); - /** Configuration */ - protected final ImmutableClassesGiraphConfiguration<I, V, E> configuration; - /** Context */ - protected final Mapper<?, ?, ?, ?>.Context context; - - /** The List of InputSplit znode paths */ - private final List<String> pathList; - /** Current position in the path list */ - private final AtomicInteger currentIndex; - /** ZooKeeperExt handle */ - private final ZooKeeperExt zooKeeperExt; - /** Get the start time in nanos */ - private final long startNanos = TIME.getNanoseconds(); - - // CHECKSTYLE: stop ParameterNumberCheck - /** - * Constructor. - - * @param splitOrganizer Input splits organizer - * @param context Context - * @param configuration Configuration - * @param zooKeeperExt Handle to ZooKeeperExt - * @param currentIndex Atomic Integer to get splitPath from list - */ - public FullInputSplitCallable(InputSplitPathOrganizer splitOrganizer, - Mapper<?, ?, ?, ?>.Context context, - ImmutableClassesGiraphConfiguration<I, V, E> configuration, - ZooKeeperExt zooKeeperExt, - AtomicInteger currentIndex) { - this.pathList = Lists.newArrayList(splitOrganizer.getPathList()); - this.currentIndex = currentIndex; - this.zooKeeperExt = zooKeeperExt; - this.context = context; - this.configuration = configuration; - } - // CHECKSTYLE: resume ParameterNumberCheck - - /** - * Get input format - * - * @return Input format - */ - public abstract GiraphInputFormat getInputFormat(); - - /** - * Load mapping entries from all the given input splits - * - * @param inputSplit Input split to load - * @return Count of vertices and edges loaded - * @throws java.io.IOException - * @throws InterruptedException - */ - protected abstract Integer readInputSplit(InputSplit inputSplit) - throws IOException, InterruptedException; - - @Override - public Integer call() { - int entries = 0; - String inputSplitPath; - int inputSplitsProcessed = 0; - try { - while (true) { - int pos = currentIndex.getAndIncrement(); - if (pos >= pathList.size()) { - break; - } - inputSplitPath = pathList.get(pos); - entries += loadInputSplit(inputSplitPath); - context.progress(); - ++inputSplitsProcessed; - } - } catch (InterruptedException e) { - throw new IllegalStateException("call: InterruptedException", e); - } catch (IOException e) { - throw new IllegalStateException("call: IOException", e); - } catch (ClassNotFoundException e) { - throw new IllegalStateException("call: ClassNotFoundException", e); - } catch (InstantiationException e) { - throw new IllegalStateException("call: InstantiationException", e); - } catch (IllegalAccessException e) { - throw new IllegalStateException("call: IllegalAccessException", e); - } - - if (LOG.isInfoEnabled()) { - float seconds = Times.getNanosSince(TIME, startNanos) / - Time.NS_PER_SECOND_AS_FLOAT; - float entriesPerSecond = entries / seconds; - LOG.info("call: Loaded " + inputSplitsProcessed + " " + - "input splits in " + seconds + " secs, " + entries + - " " + entriesPerSecond + " entries/sec"); - } - return entries; - } - - /** - * Extract entries from input split, saving them into mapping store. - * Mark the input split finished when done. - * - * @param inputSplitPath ZK location of input split - * @return Number of entries read in this input split - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - * @throws InstantiationException - * @throws IllegalAccessException - */ - private Integer loadInputSplit( - String inputSplitPath) - throws IOException, ClassNotFoundException, InterruptedException, - InstantiationException, IllegalAccessException { - InputSplit inputSplit = getInputSplit(inputSplitPath); - Integer entriesRead = readInputSplit(inputSplit); - if (LOG.isInfoEnabled()) { - LOG.info("loadFromInputSplit: Finished loading " + - inputSplitPath + " " + entriesRead); - } - return entriesRead; - } - - /** - * Talk to ZooKeeper to convert the input split path to the actual - * InputSplit. - * - * @param inputSplitPath Location in ZK of input split - * @return instance of InputSplit - * @throws IOException - * @throws ClassNotFoundException - */ - protected InputSplit getInputSplit(String inputSplitPath) - throws IOException, ClassNotFoundException { - byte[] splitList; - try { - splitList = zooKeeperExt.getData(inputSplitPath, false, null); - } catch (KeeperException e) { - throw new IllegalStateException( - "getInputSplit: KeeperException on " + inputSplitPath, e); - } catch (InterruptedException e) { - throw new IllegalStateException( - "getInputSplit: IllegalStateException on " + inputSplitPath, e); - } - context.progress(); - - DataInputStream inputStream = - new DataInputStream(new ByteArrayInputStream(splitList)); - InputSplit inputSplit = getInputFormat().readInputSplit(inputStream); - - if (LOG.isInfoEnabled()) { - LOG.info("getInputSplit: Processing " + inputSplitPath + - " from ZooKeeper and got input split '" + - inputSplit.toString() + "'"); - } - return inputSplit; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java deleted file mode 100644 index 463601c..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.worker; - -import com.google.common.collect.Lists; - -import org.apache.giraph.zk.ZooKeeperExt; -import org.apache.hadoop.io.Text; -import org.apache.zookeeper.KeeperException; - -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; - -/** - * Utility class to extract the list of InputSplits from the - * ZooKeeper tree of "claimable splits" the master created, - * and to sort the list to favor local data blocks. - * - * This class provides an Iterator for the list the worker will - * claim splits from, making all sorting and data-code locality - * processing done here invisible to callers. The aim is to cut - * down on the number of ZK reads workers perform before locating - * an unclaimed InputSplit. - */ -public class InputSplitPathOrganizer { - /** The worker's local ZooKeeperExt ref */ - private final ZooKeeperExt zooKeeper; - /** The List of InputSplit znode paths */ - private final List<String> pathList; - /** The worker's hostname */ - private final String hostName; - - /** - * Constructor - * - * @param zooKeeper the worker's ZkExt - * @param zkPathList the path to read from - * @param hostName the worker's host name (for matching) - * @param useLocality whether to prioritize local input splits - */ - public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper, - final String zkPathList, final String hostName, - final boolean useLocality) throws KeeperException, InterruptedException { - this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true), - hostName, useLocality); - } - - /** - * Constructor - * - * @param zooKeeper the worker's ZkExt - * @param inputSplitPathList path of input splits to read from - * @param hostName the worker's host name (for matching) - * @param useLocality whether to prioritize local input splits - */ - public InputSplitPathOrganizer( - final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList, - final String hostName, final boolean useLocality) { - this.zooKeeper = zooKeeper; - this.pathList = Lists.newArrayList(inputSplitPathList); - this.hostName = hostName; - // Shuffle input splits in case several workers exist on this host - Collections.shuffle(pathList); - if (useLocality) { - prioritizeLocalInputSplits(); - } - } - - /** - * Re-order list of InputSplits so files local to this worker node's - * disk are the first it will iterate over when attempting to claim - * a split to read. This will increase locality of data reads with greater - * probability as the % of total nodes in the cluster hosting data and workers - * BOTH increase towards 100%. Replication increases our chances of a "hit." - */ - private void prioritizeLocalInputSplits() { - List<String> sortedList = new ArrayList<String>(); - String hosts; - for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) { - final String path = iterator.next(); - try { - hosts = getLocationsFromZkInputSplitData(path); - } catch (IOException ioe) { - hosts = null; // no problem, just don't sort this entry - } catch (KeeperException ke) { - hosts = null; - } catch (InterruptedException ie) { - hosts = null; - } - if (hosts != null && hosts.contains(hostName)) { - sortedList.add(path); // collect the local block - iterator.remove(); // remove local block from list - } - } - pathList.addAll(0, sortedList); - } - - /** - * Utility for extracting locality data from an InputSplit ZNode. - * - * @param zkSplitPath the input split path to attempt to read - * ZNode locality data from for this InputSplit. - * @return a String of hostnames from ZNode data, or throws - */ - private String getLocationsFromZkInputSplitData(String zkSplitPath) - throws IOException, KeeperException, InterruptedException { - byte[] locationData = zooKeeper.getData(zkSplitPath, false, null); - DataInputStream inputStream = - new DataInputStream(new ByteArrayInputStream(locationData)); - // only read the "first" entry in the znode data, the locations - return Text.readString(inputStream); - } - - /** - * Get the ordered input splits paths. - * - * @return Ordered input splits paths - */ - public Iterable<String> getPathList() { - return pathList; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java index 7b2fc0f..92b23bd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java @@ -28,6 +28,7 @@ import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.VertexEdgeCount; import org.apache.giraph.io.GiraphInputFormat; +import org.apache.giraph.io.InputType; import org.apache.giraph.metrics.GiraphMetrics; import org.apache.giraph.metrics.GiraphMetricsRegistry; import org.apache.giraph.metrics.MeterDesc; @@ -35,14 +36,11 @@ import org.apache.giraph.metrics.MetricNames; import org.apache.giraph.time.SystemTime; import org.apache.giraph.time.Time; import org.apache.giraph.time.Times; -import org.apache.giraph.zk.ZooKeeperExt; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; -import org.apache.zookeeper.KeeperException; import com.yammer.metrics.core.Counter; import com.yammer.metrics.core.Meter; @@ -75,9 +73,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable, * Stores and processes the list of InputSplits advertised * in a tree of child znodes by the master. */ - private final InputSplitsHandler splitsHandler; - /** ZooKeeperExt handle */ - private final ZooKeeperExt zooKeeperExt; + private final WorkerInputSplitsHandler splitsHandler; /** Get the start time in nanos */ private final long startNanos = TIME.getNanoseconds(); /** Whether to prioritize local input splits. */ @@ -91,15 +87,12 @@ public abstract class InputSplitsCallable<I extends WritableComparable, * @param configuration Configuration * @param bspServiceWorker service worker * @param splitsHandler Handler for input splits - * @param zooKeeperExt Handle to ZooKeeperExt */ public InputSplitsCallable( Mapper<?, ?, ?, ?>.Context context, ImmutableClassesGiraphConfiguration<I, V, E> configuration, BspServiceWorker<I, V, E> bspServiceWorker, - InputSplitsHandler splitsHandler, - ZooKeeperExt zooKeeperExt) { - this.zooKeeperExt = zooKeeperExt; + WorkerInputSplitsHandler splitsHandler) { this.context = context; this.workerClientRequestProcessor = new NettyWorkerClientRequestProcessor<I, V, E>( @@ -119,6 +112,13 @@ public abstract class InputSplitsCallable<I extends WritableComparable, public abstract GiraphInputFormat getInputFormat(); /** + * Get input type + * + * @return Input type + */ + public abstract InputType getInputType(); + + /** * Get Meter tracking edges loaded * * @return Meter tracking edges loaded @@ -205,27 +205,22 @@ public abstract class InputSplitsCallable<I extends WritableComparable, @Override public VertexEdgeCount call() { VertexEdgeCount vertexEdgeCount = new VertexEdgeCount(); - String inputSplitPath; + byte[] serializedInputSplit; int inputSplitsProcessed = 0; try { - while ((inputSplitPath = splitsHandler.reserveInputSplit()) != null) { - vertexEdgeCount = - vertexEdgeCount.incrVertexEdgeCount(loadInputSplit(inputSplitPath)); + while ((serializedInputSplit = + splitsHandler.reserveInputSplit(getInputType())) != null) { + vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount( + loadInputSplit(serializedInputSplit)); context.progress(); ++inputSplitsProcessed; } - } catch (KeeperException e) { - throw new IllegalStateException("call: KeeperException", e); } catch (InterruptedException e) { throw new IllegalStateException("call: InterruptedException", e); } catch (IOException e) { throw new IllegalStateException("call: IOException", e); } catch (ClassNotFoundException e) { throw new IllegalStateException("call: ClassNotFoundException", e); - } catch (InstantiationException e) { - throw new IllegalStateException("call: InstantiationException", e); - } catch (IllegalAccessException e) { - throw new IllegalStateException("call: IllegalAccessException", e); } if (LOG.isInfoEnabled()) { @@ -252,25 +247,19 @@ public abstract class InputSplitsCallable<I extends WritableComparable, * reached in readVerticeFromInputSplit. * Mark the input split finished when done. * - * @param inputSplitPath ZK location of input split + * @param serializedInputSplit Serialized input split * @return Mapping of vertex indices and statistics, or null if no data read * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException - * @throws InstantiationException - * @throws IllegalAccessException */ - private VertexEdgeCount loadInputSplit( - String inputSplitPath) - throws IOException, ClassNotFoundException, InterruptedException, - InstantiationException, IllegalAccessException { - InputSplit inputSplit = getInputSplit(inputSplitPath); + private VertexEdgeCount loadInputSplit(byte[] serializedInputSplit) + throws IOException, ClassNotFoundException, InterruptedException { + InputSplit inputSplit = getInputSplit(serializedInputSplit); VertexEdgeCount vertexEdgeCount = readInputSplit(inputSplit); if (LOG.isInfoEnabled()) { - LOG.info("loadFromInputSplit: Finished loading " + - inputSplitPath + " " + vertexEdgeCount); + LOG.info("loadFromInputSplit: Finished loading " + vertexEdgeCount); } - splitsHandler.markInputSplitPathFinished(inputSplitPath); return vertexEdgeCount; } @@ -278,35 +267,19 @@ public abstract class InputSplitsCallable<I extends WritableComparable, * Talk to ZooKeeper to convert the input split path to the actual * InputSplit. * - * @param inputSplitPath Location in ZK of input split + * @param serializedInputSplit Serialized input split * @return instance of InputSplit * @throws IOException * @throws ClassNotFoundException */ - protected InputSplit getInputSplit(String inputSplitPath) - throws IOException, ClassNotFoundException { - byte[] splitList; - try { - splitList = zooKeeperExt.getData(inputSplitPath, false, null); - } catch (KeeperException e) { - throw new IllegalStateException( - "getInputSplit: KeeperException on " + inputSplitPath, e); - } catch (InterruptedException e) { - throw new IllegalStateException( - "getInputSplit: IllegalStateException on " + inputSplitPath, e); - } - context.progress(); - + protected InputSplit getInputSplit(byte[] serializedInputSplit) + throws IOException, ClassNotFoundException { DataInputStream inputStream = - new DataInputStream(new ByteArrayInputStream(splitList)); - if (useLocality) { - Text.readString(inputStream); // location data unused here, skip - } + new DataInputStream(new ByteArrayInputStream(serializedInputSplit)); InputSplit inputSplit = getInputFormat().readInputSplit(inputStream); if (LOG.isInfoEnabled()) { - LOG.info("getInputSplit: Reserved " + inputSplitPath + - " from ZooKeeper and got input split '" + + LOG.info("getInputSplit: Reserved input split '" + inputSplit.toString() + "'"); } return inputSplit; http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java deleted file mode 100644 index e2099eb..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.worker; - -import org.apache.giraph.zk.ZooKeeperExt; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.log4j.Logger; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.Stat; - -import com.google.common.collect.Lists; - -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Stores the list of input split paths, and provides thread-safe way for - * reserving input splits. - */ -public class InputSplitsHandler implements Watcher { - /** Class logger */ - private static final Logger LOG = - Logger.getLogger(InputSplitsHandler.class); - - /** The List of InputSplit znode paths */ - private final List<String> pathList; - /** Current position in the path list */ - private final AtomicInteger currentIndex; - /** The worker's local ZooKeeperExt ref */ - private final ZooKeeperExt zooKeeper; - /** Context for reporting progress */ - private final Mapper<?, ?, ?, ?>.Context context; - /** ZooKeeper input split reserved node. */ - private final String inputSplitReservedNode; - /** ZooKeeper input split finished node. */ - private final String inputSplitFinishedNode; - /** Specifies if we finished execution of INPUT_SUPERSTEP. The variable may - * be accessed via different threads. */ - private volatile boolean doneReadingGraph; - - /** - * Constructor - * - * @param splitOrganizer Input splits organizer - * @param zooKeeper The worker's local ZooKeeperExt ref - * @param context Context for reporting progress - * @param inputSplitReservedNode ZooKeeper input split reserved node - * @param inputSplitFinishedNode ZooKeeper input split finished node - */ - public InputSplitsHandler(InputSplitPathOrganizer splitOrganizer, - ZooKeeperExt zooKeeper, Mapper<?, ?, ?, ?>.Context context, - String inputSplitReservedNode, String inputSplitFinishedNode) { - this.pathList = Lists.newArrayList(splitOrganizer.getPathList()); - this.currentIndex = new AtomicInteger(0); - this.zooKeeper = zooKeeper; - this.context = context; - this.inputSplitReservedNode = inputSplitReservedNode; - this.inputSplitFinishedNode = inputSplitFinishedNode; - this.doneReadingGraph = false; - } - - public void setDoneReadingGraph(boolean doneReadingGraph) { - this.doneReadingGraph = doneReadingGraph; - } - - /** - * Try to reserve an InputSplit for loading. While InputSplits exists that - * are not finished, wait until they are. - * - * NOTE: iterations on the InputSplit list only halt for each worker when it - * has scanned the entire list once and found every split marked RESERVED. - * When a worker fails, its Ephemeral RESERVED znodes will disappear, - * allowing other iterating workers to claim it's previously read splits. - * Only when the last worker left iterating on the list fails can a danger - * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently - * causes job failure, this is OK. As the failure model evolves, this - * behavior might need to change. We could add watches on - * inputSplitFinishedNodes and stop iterating only when all these nodes - * have been created. - * - * @return reserved InputSplit or null if no unfinished InputSplits exist - * @throws KeeperException - * @throws InterruptedException - */ - public String reserveInputSplit() throws KeeperException, - InterruptedException { - String reservedInputSplitPath; - Stat reservedStat; - while (true) { - int splitToTry = currentIndex.getAndIncrement(); - if (splitToTry >= pathList.size()) { - return null; - } - String nextSplitToClaim = pathList.get(splitToTry); - context.progress(); - String tmpInputSplitReservedPath = - nextSplitToClaim + inputSplitReservedNode; - reservedStat = - zooKeeper.exists(tmpInputSplitReservedPath, this); - if (reservedStat == null) { - try { - // Attempt to reserve this InputSplit - zooKeeper.createExt(tmpInputSplitReservedPath, - null, - ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.EPHEMERAL, - false); - reservedInputSplitPath = nextSplitToClaim; - if (LOG.isInfoEnabled()) { - float percentFinished = - splitToTry * 100.0f / pathList.size(); - LOG.info("reserveInputSplit: Reserved input " + - "split path " + reservedInputSplitPath + - ", overall roughly " + - +percentFinished + - "% input splits reserved"); - } - return reservedInputSplitPath; - } catch (KeeperException.NodeExistsException e) { - LOG.info("reserveInputSplit: Couldn't reserve " + - "(already reserved) inputSplit" + - " at " + tmpInputSplitReservedPath); - } catch (KeeperException e) { - throw new IllegalStateException( - "reserveInputSplit: KeeperException on reserve", e); - } catch (InterruptedException e) { - throw new IllegalStateException( - "reserveInputSplit: InterruptedException " + - "on reserve", e); - } - } - } - } - - /** - * Mark an input split path as completed by this worker. This notifies - * the master and the other workers that this input split has not only - * been reserved, but also marked processed. - * - * @param inputSplitPath Path to the input split. - */ - public void markInputSplitPathFinished(String inputSplitPath) { - String inputSplitFinishedPath = - inputSplitPath + inputSplitFinishedNode; - try { - zooKeeper.createExt(inputSplitFinishedPath, - null, - ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - true); - } catch (KeeperException.NodeExistsException e) { - LOG.warn("markInputSplitPathFinished: " + inputSplitFinishedPath + - " already exists!"); - } catch (KeeperException e) { - throw new IllegalStateException( - "markInputSplitPathFinished: KeeperException on " + - inputSplitFinishedPath, e); - } catch (InterruptedException e) { - throw new IllegalStateException( - "markInputSplitPathFinished: InterruptedException on " + - inputSplitFinishedPath, e); - } - } - - @Override - public void process(WatchedEvent event) { - if (event.getPath() == null) { - LOG.warn("process: Problem with zookeeper, got event with path null, " + - "state " + event.getState() + ", event type " + event.getType()); - return; - } - // Check if the reservation for the input split was lost in INPUT_SUPERSTEP - // (some worker died). If INPUT_SUPERSTEP has already completed, we ignore - // this event. - if (event.getPath().endsWith(inputSplitReservedNode) && - event.getType() == Watcher.Event.EventType.NodeDeleted && - !doneReadingGraph) { - synchronized (pathList) { - String split = event.getPath(); - split = split.substring(0, split.indexOf(inputSplitReservedNode)); - pathList.add(split); - if (LOG.isInfoEnabled()) { - LOG.info("process: Input split " + split + " lost reservation"); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java index f6dca25..5ab3ba9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java @@ -19,15 +19,15 @@ package org.apache.giraph.worker; import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.VertexEdgeCount; import org.apache.giraph.io.GiraphInputFormat; import org.apache.giraph.io.MappingInputFormat; import org.apache.giraph.io.MappingReader; import org.apache.giraph.mapping.MappingEntry; import org.apache.giraph.mapping.MappingStore; -import org.apache.giraph.zk.ZooKeeperExt; +import org.apache.giraph.io.InputType; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; @@ -46,7 +46,7 @@ import org.apache.hadoop.mapreduce.Mapper; @SuppressWarnings("unchecked") public class MappingInputSplitsCallable<I extends WritableComparable, V extends Writable, E extends Writable, B extends Writable> - extends FullInputSplitCallable<I, V, E> { + extends InputSplitsCallable<I, V, E> { /** User supplied mappingInputFormat */ private final MappingInputFormat<I, V, E, B> mappingInputFormat; /** Link to bspServiceWorker */ @@ -56,23 +56,18 @@ public class MappingInputSplitsCallable<I extends WritableComparable, * Constructor * * @param mappingInputFormat mappingInputFormat - * @param splitOrganizer Input splits organizer * @param context Context * @param configuration Configuration - * @param zooKeeperExt Handle to ZooKeeperExt - * @param currentIndex Atomic Integer to get splitPath from list * @param bspServiceWorker bsp service worker + * @param splitsHandler Splits handler */ public MappingInputSplitsCallable( MappingInputFormat<I, V, E, B> mappingInputFormat, - InputSplitPathOrganizer splitOrganizer, Mapper<?, ?, ?, ?>.Context context, ImmutableClassesGiraphConfiguration<I, V, E> configuration, - ZooKeeperExt zooKeeperExt, - AtomicInteger currentIndex, - BspServiceWorker<I, V, E> bspServiceWorker) { - super(splitOrganizer, context, - configuration, zooKeeperExt, currentIndex); + BspServiceWorker<I, V, E> bspServiceWorker, + WorkerInputSplitsHandler splitsHandler) { + super(context, configuration, bspServiceWorker, splitsHandler); this.mappingInputFormat = mappingInputFormat; this.bspServiceWorker = bspServiceWorker; } @@ -83,7 +78,12 @@ public class MappingInputSplitsCallable<I extends WritableComparable, } @Override - protected Integer readInputSplit(InputSplit inputSplit) + public InputType getInputType() { + return InputType.MAPPING; + } + + @Override + protected VertexEdgeCount readInputSplit(InputSplit inputSplit) throws IOException, InterruptedException { MappingReader<I, V, E, B> mappingReader = mappingInputFormat.createMappingReader(inputSplit, context); @@ -104,6 +104,6 @@ public class MappingInputSplitsCallable<I extends WritableComparable, entriesLoaded += 1; mappingStore.addEntry(entry.getVertexId(), entry.getMappingTarget()); } - return entriesLoaded; + return new VertexEdgeCount(0, 0, entriesLoaded); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java index 21a981e..6cf702a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java @@ -19,15 +19,13 @@ package org.apache.giraph.worker; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.VertexEdgeCount; import org.apache.giraph.io.MappingInputFormat; import org.apache.giraph.utils.CallableFactory; -import org.apache.giraph.zk.ZooKeeperExt; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; -import java.util.concurrent.atomic.AtomicInteger; - /** * Factory for {@link org.apache.giraph.worker.MappingInputSplitsCallable}s. * @@ -38,59 +36,47 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class MappingInputSplitsCallableFactory<I extends WritableComparable, V extends Writable, E extends Writable, B extends Writable> - implements CallableFactory<Integer> { + implements CallableFactory<VertexEdgeCount> { /** Mapping input format */ private final MappingInputFormat<I, V, E, B> mappingInputFormat; - /** Input split organizer */ - private final InputSplitPathOrganizer splitOrganizer; /** Mapper context. */ private final Mapper<?, ?, ?, ?>.Context context; /** Configuration. */ private final ImmutableClassesGiraphConfiguration<I, V, E> configuration; /** {@link BspServiceWorker} we're running on. */ private final BspServiceWorker<I, V, E> bspServiceWorker; - /** {@link ZooKeeperExt} for this worker. */ - private final ZooKeeperExt zooKeeperExt; - /** Current position in the path list */ - private final AtomicInteger currentIndex; - + /** Handler for input splits */ + private final WorkerInputSplitsHandler splitsHandler; /** * Constructor. * * @param mappingInputFormat Mapping input format - * @param splitOrganizer Input split organizer * @param context Mapper context * @param configuration Configuration * @param bspServiceWorker Calling {@link BspServiceWorker} - * @param zooKeeperExt {@link org.apache.giraph.zk.ZooKeeperExt} - * for this worker + * @param splitsHandler Splits handler */ public MappingInputSplitsCallableFactory( MappingInputFormat<I, V, E, B> mappingInputFormat, - InputSplitPathOrganizer splitOrganizer, Mapper<?, ?, ?, ?>.Context context, ImmutableClassesGiraphConfiguration<I, V, E> configuration, BspServiceWorker<I, V, E> bspServiceWorker, - ZooKeeperExt zooKeeperExt) { + WorkerInputSplitsHandler splitsHandler) { this.mappingInputFormat = mappingInputFormat; - this.splitOrganizer = splitOrganizer; this.context = context; this.configuration = configuration; this.bspServiceWorker = bspServiceWorker; - this.zooKeeperExt = zooKeeperExt; - this.currentIndex = new AtomicInteger(0); + this.splitsHandler = splitsHandler; } @Override - public FullInputSplitCallable<I, V, E> newCallable(int threadId) { + public InputSplitsCallable<I, V, E> newCallable(int threadId) { return new MappingInputSplitsCallable<>( mappingInputFormat, - splitOrganizer, context, configuration, - zooKeeperExt, - currentIndex, - bspServiceWorker); + bspServiceWorker, + splitsHandler); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java index 00a2781..540a6b4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java @@ -30,10 +30,10 @@ import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexReader; import org.apache.giraph.io.filters.VertexInputFilter; import org.apache.giraph.mapping.translate.TranslateEdge; +import org.apache.giraph.io.InputType; import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.utils.LoggerUtils; import org.apache.giraph.utils.MemoryUtils; -import org.apache.giraph.zk.ZooKeeperExt; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; @@ -99,17 +99,14 @@ public class VertexInputSplitsCallable<I extends WritableComparable, * @param configuration Configuration * @param bspServiceWorker service worker * @param splitsHandler Handler for input splits - * @param zooKeeperExt Handle to ZooKeeperExt */ public VertexInputSplitsCallable( VertexInputFormat<I, V, E> vertexInputFormat, Mapper<?, ?, ?, ?>.Context context, ImmutableClassesGiraphConfiguration<I, V, E> configuration, BspServiceWorker<I, V, E> bspServiceWorker, - InputSplitsHandler splitsHandler, - ZooKeeperExt zooKeeperExt) { - super(context, configuration, bspServiceWorker, splitsHandler, - zooKeeperExt); + WorkerInputSplitsHandler splitsHandler) { + super(context, configuration, bspServiceWorker, splitsHandler); this.vertexInputFormat = vertexInputFormat; inputSplitMaxVertices = configuration.getInputSplitMaxVertices(); @@ -136,6 +133,11 @@ public class VertexInputSplitsCallable<I extends WritableComparable, return vertexInputFormat; } + @Override + public InputType getInputType() { + return InputType.VERTEX; + } + /** * Read vertices from input split. If testing, the user may request a * maximum number of vertices to be read from an input split. @@ -274,7 +276,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable, WorkerProgress.get().incrementVertexInputSplitsLoaded(); return new VertexEdgeCount(inputSplitVerticesLoaded, - inputSplitEdgesLoaded + edgesSinceLastUpdate); + inputSplitEdgesLoaded + edgesSinceLastUpdate, 0); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java index c9893d2..7aef3a7 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java @@ -22,7 +22,6 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.VertexEdgeCount; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.utils.CallableFactory; -import org.apache.giraph.zk.ZooKeeperExt; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; @@ -46,9 +45,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable, /** {@link BspServiceWorker} we're running on. */ private final BspServiceWorker<I, V, E> bspServiceWorker; /** Handler for input splits */ - private final InputSplitsHandler splitsHandler; - /** {@link ZooKeeperExt} for this worker. */ - private final ZooKeeperExt zooKeeperExt; + private final WorkerInputSplitsHandler splitsHandler; /** * Constructor. @@ -58,20 +55,17 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable, * @param configuration Configuration * @param bspServiceWorker Calling {@link BspServiceWorker} * @param splitsHandler Handler for input splits - * @param zooKeeperExt {@link ZooKeeperExt} for this worker */ public VertexInputSplitsCallableFactory( VertexInputFormat<I, V, E> vertexInputFormat, Mapper<?, ?, ?, ?>.Context context, ImmutableClassesGiraphConfiguration<I, V, E> configuration, BspServiceWorker<I, V, E> bspServiceWorker, - InputSplitsHandler splitsHandler, - ZooKeeperExt zooKeeperExt) { + WorkerInputSplitsHandler splitsHandler) { this.vertexInputFormat = vertexInputFormat; this.context = context; this.configuration = configuration; this.bspServiceWorker = bspServiceWorker; - this.zooKeeperExt = zooKeeperExt; this.splitsHandler = splitsHandler; } @@ -82,7 +76,6 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable, context, configuration, bspServiceWorker, - splitsHandler, - zooKeeperExt); + splitsHandler); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java new file mode 100644 index 0000000..0dc42b3 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.worker; + +import org.apache.giraph.comm.WorkerClient; +import org.apache.giraph.comm.requests.AskForInputSplitRequest; +import org.apache.giraph.io.InputType; + +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Requests splits from master and keeps track of them + */ +public class WorkerInputSplitsHandler { + /** Worker info of this worker */ + private final WorkerInfo workerInfo; + /** Task id of master */ + private final int masterTaskId; + /** Worker client, used for communication */ + private final WorkerClient workerClient; + /** Map with currently available splits received from master */ + private final Map<InputType, BlockingQueue<byte[]>> availableInputSplits; + + /** + * Constructor + * + * @param workerInfo Worker info of this worker + * @param masterTaskId Task id of master + * @param workerClient Worker client, used for communication + */ + public WorkerInputSplitsHandler(WorkerInfo workerInfo, int masterTaskId, + WorkerClient workerClient) { + this.workerInfo = workerInfo; + this.masterTaskId = masterTaskId; + this.workerClient = workerClient; + availableInputSplits = new EnumMap<>(InputType.class); + for (InputType inputType : InputType.values()) { + availableInputSplits.put( + inputType, new LinkedBlockingQueue<byte[]>()); + } + } + + /** + * Called when an input split has been received from master, adding it to + * the map + * + * @param splitType Type of split + * @param serializedInputSplit Split + */ + public void receivedInputSplit(InputType splitType, + byte[] serializedInputSplit) { + try { + availableInputSplits.get(splitType).put(serializedInputSplit); + } catch (InterruptedException e) { + throw new IllegalStateException("Interrupted", e); + } + } + + /** + * Try to reserve an InputSplit for loading. While InputSplits exists that + * are not finished, wait until they are. + * + * NOTE: iterations on the InputSplit list only halt for each worker when it + * has scanned the entire list once and found every split marked RESERVED. + * When a worker fails, its Ephemeral RESERVED znodes will disappear, + * allowing other iterating workers to claim it's previously read splits. + * Only when the last worker left iterating on the list fails can a danger + * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently + * causes job failure, this is OK. As the failure model evolves, this + * behavior might need to change. We could add watches on + * inputSplitFinishedNodes and stop iterating only when all these nodes + * have been created. + * + * @param splitType Type of split + * @return reserved InputSplit or null if no unfinished InputSplits exist + */ + public byte[] reserveInputSplit(InputType splitType) { + // Send request + workerClient.sendWritableRequest(masterTaskId, + new AskForInputSplitRequest(splitType, workerInfo.getTaskId())); + try { + // Wait for some split to become available + byte[] serializedInputSplit = availableInputSplits.get(splitType).take(); + return serializedInputSplit.length == 0 ? null : serializedInputSplit; + } catch (InterruptedException e) { + throw new IllegalStateException("Interrupted", e); + } + } +}
