http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreIOScheduler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreIOScheduler.java b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreIOScheduler.java new file mode 100644 index 0000000..2cb002f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreIOScheduler.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.ooc.io.IOCommand; +import org.apache.giraph.ooc.io.LoadPartitionIOCommand; +import org.apache.giraph.ooc.io.StoreDataBufferIOCommand; +import org.apache.giraph.ooc.io.StoreIncomingMessageIOCommand; +import org.apache.giraph.ooc.io.StorePartitionIOCommand; +import org.apache.giraph.ooc.io.WaitIOCommand; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.google.common.base.Preconditions.checkState; + +/** + * IO Scheduler for out-of-core mechanism with fixed number of partitions in + * memory + */ +public class FixedOutOfCoreIOScheduler extends OutOfCoreIOScheduler { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(FixedOutOfCoreIOScheduler.class); + /** Maximum number of partitions to be kept in memory */ + private final int maxPartitionsInMemory; + /** + * Number of partitions to be added (loaded) or removed (stored) to/from + * memory. Each outstanding load partition counts +1 and each outstanding + * store partition counts -1 toward this counter. + */ + private final AtomicInteger deltaNumPartitionsInMemory = + new AtomicInteger(0); + /** Queue of IO commands for loading partitions to memory */ + private final List<Queue<IOCommand>> threadLoadCommandQueue; + /** Queue of IO commands for storing partition on disk */ + private final List<Queue<IOCommand>> threadStoreCommandQueue; + /** Whether IO threads should terminate */ + private volatile boolean shouldTerminate; + + /** + * Constructor + * @param maxPartitionsInMemory maximum number of partitions can be kept in + * memory + * @param numThreads number of available IO threads (i.e. disks) + * @param oocEngine out-of-core engine + * @param conf configuration + */ + public FixedOutOfCoreIOScheduler(int maxPartitionsInMemory, int numThreads, + OutOfCoreEngine oocEngine, + ImmutableClassesGiraphConfiguration conf) { + super(conf, oocEngine, numThreads); + this.maxPartitionsInMemory = maxPartitionsInMemory; + threadLoadCommandQueue = new ArrayList<>(numThreads); + threadStoreCommandQueue = new ArrayList<>(numThreads); + for (int i = 0; i < numThreads; ++i) { + threadLoadCommandQueue.add( + new ConcurrentLinkedQueue<IOCommand>()); + threadStoreCommandQueue.add( + new ConcurrentLinkedQueue<IOCommand>()); + } + shouldTerminate = false; + } + + @Override + public IOCommand getNextIOCommand(int threadId) { + if (shouldTerminate) { + return null; + } + int numPartitionsInMemory = + oocEngine.getMetaPartitionManager().getNumInMemoryPartitions(); + IOCommand command = null; + if (LOG.isInfoEnabled()) { + LOG.info("getNextIOCommand with " + numPartitionsInMemory + + " partitions in memory, " + deltaNumPartitionsInMemory.get() + + " on the fly"); + } + // Check if we have to store a partition on disk + if (numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() > + maxPartitionsInMemory) { + command = threadStoreCommandQueue.get(threadId).poll(); + if (command == null) { + Integer partitionId = oocEngine.getMetaPartitionManager() + .getOffloadPartitionId(threadId); + if (partitionId != null) { + command = new StorePartitionIOCommand(oocEngine, partitionId); + } else { + deltaNumPartitionsInMemory.getAndIncrement(); + } + } else { + checkState(command instanceof StorePartitionIOCommand, + "getNextIOCommand: Illegal command type in store command queue!"); + } + } else { + // Roll back the decrement in delta counter. + deltaNumPartitionsInMemory.getAndIncrement(); + } + + // Check if there is any buffers/messages of current out-of-core partitions + // in memory + if (command == null) { + Integer partitionId = oocEngine.getMetaPartitionManager() + .getOffloadPartitionBufferId(threadId); + if (partitionId != null) { + command = new StoreDataBufferIOCommand(oocEngine, partitionId, + StoreDataBufferIOCommand.DataBufferType.PARTITION); + } else { + partitionId = oocEngine.getMetaPartitionManager() + .getOffloadMessageBufferId(threadId); + if (partitionId != null) { + command = new StoreDataBufferIOCommand(oocEngine, partitionId, + StoreDataBufferIOCommand.DataBufferType.MESSAGE); + } else { + partitionId = oocEngine.getMetaPartitionManager() + .getOffloadMessageId(threadId); + if (partitionId != null) { + command = new StoreIncomingMessageIOCommand(oocEngine, partitionId); + } + } + } + } + + // Check if we can load/prefetch a partition to memory + if (command == null) { + if (numPartitionsInMemory + + deltaNumPartitionsInMemory.getAndIncrement() <= + maxPartitionsInMemory) { + command = threadLoadCommandQueue.get(threadId).poll(); + if (command == null) { + Integer partitionId = oocEngine.getMetaPartitionManager() + .getPrefetchPartitionId(threadId); + if (partitionId != null) { + command = new LoadPartitionIOCommand(oocEngine, partitionId, + oocEngine.getServiceWorker().getSuperstep()); + } else { + deltaNumPartitionsInMemory.getAndDecrement(); + } + } + } else { + // Roll back the increment in delta counter. + deltaNumPartitionsInMemory.getAndDecrement(); + } + } + + // Check if no appropriate IO command is found + if (command == null) { + command = new WaitIOCommand(oocEngine, waitInterval); + } + return command; + } + + @Override + public void ioCommandCompleted(IOCommand command) { + if (command instanceof LoadPartitionIOCommand) { + deltaNumPartitionsInMemory.getAndDecrement(); + } else if (command instanceof StorePartitionIOCommand) { + deltaNumPartitionsInMemory.getAndIncrement(); + } + oocEngine.ioCommandCompleted(command); + } + + @Override + public void addIOCommand(IOCommand ioCommand) { + int ownerThread = getOwnerThreadId(ioCommand.getPartitionId()); + if (ioCommand instanceof LoadPartitionIOCommand) { + threadLoadCommandQueue.get(ownerThread).offer(ioCommand); + } else if (ioCommand instanceof StorePartitionIOCommand) { + threadStoreCommandQueue.get(ownerThread).offer(ioCommand); + } else { + throw new IllegalStateException("addIOCommand: IO command type is not " + + "supported for addition"); + } + } + + @Override + public void shutdown() { + super.shutdown(); + shouldTerminate = true; + } + + /** + * Clear store command queue (should happen at the beginning of each iteration + * to eliminate eager store commands generated by OOC engine) + */ + public void clearStoreCommandQueue() { + for (int i = 0; i < threadStoreCommandQueue.size(); ++i) { + threadStoreCommandQueue.get(i).clear(); + } + } +}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/JVMMemoryEstimator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/JVMMemoryEstimator.java b/giraph-core/src/main/java/org/apache/giraph/ooc/JVMMemoryEstimator.java deleted file mode 100644 index 46d989a..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/JVMMemoryEstimator.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc; - -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.utils.MemoryUtils; - -/** - * Memory estimator class using JVM runtime methods to estimate the - * free/available memory. - */ -public class JVMMemoryEstimator implements MemoryEstimator { - /** - * Constructor for reflection - */ - public JVMMemoryEstimator() { } - - @Override - public void initialize(CentralizedServiceWorker serviceWorker) { } - - @Override - public double freeMemoryMB() { - return MemoryUtils.freePlusUnallocatedMemoryMB(); - } - - @Override public double maxMemoryMB() { - return MemoryUtils.maxMemoryMB(); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/MemoryEstimator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/MemoryEstimator.java b/giraph-core/src/main/java/org/apache/giraph/ooc/MemoryEstimator.java deleted file mode 100644 index 74a2f7b..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/MemoryEstimator.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc; - -import org.apache.giraph.bsp.CentralizedServiceWorker; - -/** - * Interface for memory estimator. Estimated memory is used in adaptive - * out-of-core mechanism. - */ -public interface MemoryEstimator { - /** - * Initialize the memory estimator. - * - * @param serviceWorker Worker service - */ - void initialize(CentralizedServiceWorker serviceWorker); - - /** - * @return amount of free memory in MB - */ - double freeMemoryMB(); - - /** - * @return amount of available memory in MB - */ - double maxMemoryMB(); -} http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java index 218a10b..bc0ece4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java @@ -18,26 +18,189 @@ package org.apache.giraph.ooc; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.ServerData; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.ooc.data.MetaPartitionManager; +import org.apache.giraph.ooc.io.IOCommand; +import org.apache.log4j.Logger; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static com.google.common.base.Preconditions.checkState; /** - * Engine for out-of-core mechanism. The engine may attach to a partition store - * capable of handling disk transfer of partitions. - * - * @param <I> Vertex id - * @param <V> Vertex data - * @param <E> Edge data + * Class to represent an out-of-core engine. */ -public interface OutOfCoreEngine<I extends WritableComparable, - V extends Writable, E extends Writable> { +public abstract class OutOfCoreEngine { + /** Class logger. */ + private static final Logger LOG = Logger.getLogger(OutOfCoreEngine.class); + /** Service worker */ + protected final CentralizedServiceWorker<?, ?, ?> service; + /** Scheduler for IO threads */ + protected OutOfCoreIOScheduler ioScheduler; + /** Data structure to keep meta partition information */ + protected final MetaPartitionManager metaPartitionManager; + /** How many disk (i.e. IO threads) do we have? */ + protected final int numIOThreads; + /** + * Whether the job should fail due to IO threads terminating because of + * exceptions + */ + protected volatile boolean jobFailed = false; + /** Whether the out-of-core engine has initialized */ + private final AtomicBoolean isInitialized = new AtomicBoolean(false); + /** + * Global lock for entire superstep. This lock helps to avoid overlapping of + * out-of-core decisions (what to do next to help the out-of-core mechanism) + * with out-of-core operations (actual IO operations). + */ + private final ReadWriteLock superstepLock = new ReentrantReadWriteLock(); + /** Callable factory for IO threads */ + private final OutOfCoreIOCallableFactory oocIOCallableFactory; + + /** + * Constructor + * + * @param conf Configuration + * @param service Service worker + */ + public OutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> conf, + CentralizedServiceWorker<?, ?, ?> service) { + this.service = service; + this.oocIOCallableFactory = new OutOfCoreIOCallableFactory(conf, this); + this.numIOThreads = oocIOCallableFactory.getNumDisks(); + this.metaPartitionManager = new MetaPartitionManager(numIOThreads, this); + oocIOCallableFactory.createCallable(); + } + /** * Initialize/Start the out-of-core engine. */ - void initialize(); + @edu.umd.cs.findbugs.annotations.SuppressWarnings( + "JLM_JSR166_UTILCONCURRENT_MONITORENTER") + public void initialize() { + synchronized (isInitialized) { + isInitialized.set(true); + isInitialized.notifyAll(); + } + } /** * Shutdown/Stop the out-of-core engine. */ - void shutdown(); + public void shutdown() { + if (LOG.isInfoEnabled()) { + LOG.info("shutdown: out-of-core engine shutting down, signalling IO " + + "threads to shutdown"); + } + ioScheduler.shutdown(); + oocIOCallableFactory.shutdown(); + } + + /** + * Get a reference to the server data + * + * @return ServerData + */ + public ServerData getServerData() { + return service.getServerData(); + } + + /** + * Get a reference to the service worker + * + * @return CentralizedServiceWorker + */ + public CentralizedServiceWorker getServiceWorker() { + return service; + } + + /** + * Get a reference to IO scheduler + * + * @return OutOfCoreIOScheduler + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings( + "JLM_JSR166_UTILCONCURRENT_MONITORENTER") + public OutOfCoreIOScheduler getIOScheduler() { + synchronized (isInitialized) { + while (!isInitialized.get()) { + try { + isInitialized.wait(); + } catch (InterruptedException e) { + throw new IllegalStateException("getIOScheduler: " + + "InterruptedException while waiting for out-of-core engine to " + + "be initialized!"); + } + } + } + return ioScheduler; + } + + /** + * Get a reference to meta partition information + * + * @return MetaPartitionManager + */ + public MetaPartitionManager getMetaPartitionManager() { + checkState(isInitialized.get()); + return metaPartitionManager; + } + + /** + * Get a refernce to superstep lock + * + * @return read/write lock used for global superstep lock + */ + public ReadWriteLock getSuperstepLock() { + return superstepLock; + } + + /** + * Get the id of the next partition to process in the current iteration over + * all the partitions. If all partitions are already processed, this method + * returns null. + * + * @return id of a partition to process. 'null' if all partitions are + * processed in current iteration over partitions. + */ + public abstract Integer getNextPartition(); + + /** + * Notify out-of-core engine that processing of a particular partition is done + * + * @param partitionId id of the partition that its processing is done + */ + public abstract void doneProcessingPartition(int partitionId); + + /** + * Notify out=of-core engine that iteration cycle over all partitions is about + * to begin. + */ + public abstract void startIteration(); + + /** + * Retrieve a particular partition. After this method is complete the + * requested partition should be in memory. + * + * @param partitionId id of the partition to retrieve + */ + public abstract void retrievePartition(int partitionId); + + /** + * Notify out-of-core engine that an IO command is competed by an IO thread + * + * @param command the IO command that is completed + */ + public abstract void ioCommandCompleted(IOCommand command); + + /** + * Set a flag to fail the job. + */ + public void failTheJob() { + jobFailed = true; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java new file mode 100644 index 0000000..6c6e4ff --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc; + +import org.apache.giraph.ooc.io.IOCommand; +import org.apache.giraph.ooc.io.WaitIOCommand; +import org.apache.log4j.Logger; + +import java.util.concurrent.Callable; + +/** + * IO threads for out-of-core mechanism. + */ +public class OutOfCoreIOCallable implements Callable<Void> { + /** Class logger. */ + private static final Logger LOG = Logger.getLogger(OutOfCoreIOCallable.class); + /** Out-of-core engine */ + private final OutOfCoreEngine oocEngine; + /** Base path that this thread will write to/read from */ + private final String basePath; + /** Thread id/Disk id */ + private final int diskId; + + /** + * Constructor + * + * @param oocEngine out-of-core engine + * @param basePath base path this thread will be using + * @param diskId thread id/disk id + */ + public OutOfCoreIOCallable(OutOfCoreEngine oocEngine, String basePath, + int diskId) { + this.oocEngine = oocEngine; + this.basePath = basePath; + this.diskId = diskId; + } + + @Override + public Void call() { + while (true) { + oocEngine.getSuperstepLock().readLock().lock(); + IOCommand command = oocEngine.getIOScheduler().getNextIOCommand(diskId); + if (LOG.isInfoEnabled()) { + LOG.info("call: thread " + diskId + "'s next IO command is: " + + command); + } + if (command == null) { + oocEngine.getSuperstepLock().readLock().unlock(); + break; + } + if (command instanceof WaitIOCommand) { + oocEngine.getSuperstepLock().readLock().unlock(); + } + // CHECKSTYLE: stop IllegalCatch + try { + command.execute(basePath); + } catch (Exception e) { + oocEngine.failTheJob(); + LOG.info("call: execution of IO command " + command + " failed!"); + throw new RuntimeException(e); + } + // CHECKSTYLE: resume IllegalCatch + if (!(command instanceof WaitIOCommand)) { + oocEngine.getSuperstepLock().readLock().unlock(); + } + oocEngine.getIOScheduler().ioCommandCompleted(command); + } + if (LOG.isInfoEnabled()) { + LOG.info("call: out-of-core IO thread " + diskId + " terminating!"); + } + return null; + } +} + http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java new file mode 100644 index 0000000..d4fea22 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.utils.CallableFactory; +import org.apache.giraph.utils.LogStacktraceCallable; +import org.apache.giraph.utils.ThreadUtils; +import org.apache.log4j.Logger; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkState; +import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY; + +/** + * Factory class to create IO threads for out-of-core engine. + */ +public class OutOfCoreIOCallableFactory { + /** Class logger. */ + private static final Logger LOG = + Logger.getLogger(OutOfCoreIOCallableFactory.class); + /** Out-of-core engine */ + private final OutOfCoreEngine oocEngine; + /** Result of IO threads at the end of the computation */ + private final List<Future> results; + /** How many disks (i.e. IO threads) do we have? */ + private int numDisks; + /** Path prefix for different disks */ + private final String[] basePaths; + /** Executor service for IO threads */ + private ExecutorService outOfCoreIOExecutor; + /** + * Constructor + * + * @param conf Configuration + * @param oocEngine Out-of-core engine + */ + public OutOfCoreIOCallableFactory( + ImmutableClassesGiraphConfiguration<?, ?, ?> conf, + OutOfCoreEngine oocEngine) { + this.oocEngine = oocEngine; + this.results = new ArrayList<>(); + // Take advantage of multiple disks + String[] userPaths = PARTITIONS_DIRECTORY.getArray(conf); + this.numDisks = userPaths.length; + this.basePaths = new String[numDisks]; + int ptr = 0; + for (String path : userPaths) { + File file = new File(path); + if (!file.exists()) { + checkState(file.mkdirs(), "OutOfCoreIOCallableFactory: cannot create " + + "directory " + file.getAbsolutePath()); + } + basePaths[ptr] = path + "/" + conf.get("mapred.job.id", "Unknown Job"); + ptr++; + } + } + + /** + * Creates/Launches IO threads + */ + public void createCallable() { + CallableFactory<Void> outOfCoreIOCallableFactory = + new CallableFactory<Void>() { + @Override + public Callable<Void> newCallable(int callableId) { + return new OutOfCoreIOCallable(oocEngine, basePaths[callableId], + callableId); + } + }; + outOfCoreIOExecutor = new ThreadPoolExecutor(numDisks, numDisks, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), + ThreadUtils.createThreadFactory("ooc-io-%d")) { + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + if (t == null && r instanceof Future<?>) { + try { + Future<?> future = (Future<?>) r; + if (future.isDone()) { + future.get(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + t = e; + } + if (t != null) { + LOG.info("afterExecute: an out-of-core thread terminated " + + "unexpectedly with " + t); + oocEngine.failTheJob(); + } + } + } + }; + + for (int i = 0; i < numDisks; ++i) { + Future<Void> future = outOfCoreIOExecutor.submit( + new LogStacktraceCallable<>( + outOfCoreIOCallableFactory.newCallable(i))); + results.add(future); + } + // Notify executor to not accept any more tasks + outOfCoreIOExecutor.shutdown(); + } + + /** + * How many disks do we have? + * + * @return number of disks (IO threads) + */ + public int getNumDisks() { + return numDisks; + } + + /** + * Check whether all IO threads terminated gracefully. + */ + public void shutdown() { + boolean threadsTerminated = false; + while (!threadsTerminated) { + if (LOG.isInfoEnabled()) { + LOG.info("shutdown: waiting for IO threads to finish!"); + } + try { + threadsTerminated = + outOfCoreIOExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new IllegalStateException("shutdown: caught " + + "InterruptedException while waiting for IO threads to finish"); + } + } + for (int i = 0; i < numDisks; ++i) { + try { + // Check whether the tread terminated gracefully + results.get(i).get(); + } catch (InterruptedException e) { + LOG.error("shutdown: IO thread " + i + " was interrupted during its " + + "execution"); + throw new IllegalStateException(e); + } catch (ExecutionException e) { + LOG.error("shutdown: IO thread " + i + " threw an exception during " + + "its execution"); + throw new IllegalStateException(e); + } + } + for (String path : basePaths) { + File file = new File(path).getParentFile(); + for (String subFileName : file.list()) { + File subFile = new File(file.getPath(), subFileName); + checkState(subFile.delete(), "shutdown: cannot delete file %s", + subFile.getAbsoluteFile()); + } + checkState(file.delete(), "shutdown: cannot delete directory %s", + file.getAbsoluteFile()); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java new file mode 100644 index 0000000..dee632d --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc; + +import com.google.common.hash.Hashing; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.IntConfOption; +import org.apache.giraph.ooc.io.IOCommand; +import org.apache.log4j.Logger; + +/** + * Representation of IO thread scheduler for out-of-core mechanism + */ +public abstract class OutOfCoreIOScheduler { + /** + * If an IO thread does not have any command to do, it waits for certain a + * period and check back again to see if there exist any command to perform. + * This constant determines this wait period in milliseconds. + */ + public static final IntConfOption OOC_WAIT_INTERVAL = + new IntConfOption("giraph.oocWaitInterval", 1000, + "Duration (in milliseconds) which IO threads in out-of-core " + + "mechanism would wait until a command becomes available"); + /** Class logger. */ + private static final Logger LOG = + Logger.getLogger(OutOfCoreIOScheduler.class); + /** Out-of-core engine */ + protected final OutOfCoreEngine oocEngine; + /** How much an IO thread should wait if there is no IO command */ + protected final int waitInterval; + /** How many disks (i.e. IO threads) do we have? */ + private final int numDisks; + + /** + * Constructor + * + * @param conf configuration + * @param oocEngine out-of-core engine + * @param numDisks number of disks (IO threads) + */ + OutOfCoreIOScheduler(final ImmutableClassesGiraphConfiguration conf, + OutOfCoreEngine oocEngine, int numDisks) { + this.oocEngine = oocEngine; + this.numDisks = numDisks; + this.waitInterval = OOC_WAIT_INTERVAL.get(conf); + } + + /** + * Get the thread id that is responsible for a particular partition + * + * @param partitionId id of the given partition + * @return id of the thread responsible for the given partition + */ + public int getOwnerThreadId(int partitionId) { + int result = Hashing.murmur3_32().hashInt(partitionId).asInt() % numDisks; + return (result >= 0) ? result : (result + numDisks); + } + + /** + * Generate and return the next appropriate IO command for a given thread + * + * @param threadId id of the thread ready to execute the next IO command + * @return next IO command to be executed by the given thread + */ + public abstract IOCommand getNextIOCommand(int threadId); + + /** + * Notify IO scheduler that the IO command is completed + * + * @param command completed command + */ + public abstract void ioCommandCompleted(IOCommand command); + + /** + * Add an IO command to the scheduling queue of the IO scheduler + * + * @param ioCommand IO command to add to the scheduler + */ + public abstract void addIOCommand(IOCommand ioCommand); + + /** + * Shutdown/Terminate the IO scheduler, and notify all IO threads to halt + */ + public void shutdown() { + if (LOG.isInfoEnabled()) { + LOG.info("shutdown: OutOfCoreIOScheduler shutting down!"); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java deleted file mode 100644 index 7c4d8df..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc; - -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Class to implement slaves for adaptive out-of-core brain. Basically the brain - * decides on when to offload and what data to offload to disk and generates - * commands for offloading. Slaves just execute the commands. Commands can be: - * 1) offloading vertex buffer of partitions in INPUT_SUPERSTEP, - * 2) offloading edge buffer of partitions in INPUT_SUPERSTEP, - * 3) offloading partitions. - * - * @param <I> Vertex id - * @param <V> Vertex value - * @param <E> Edge value - */ -public class OutOfCoreProcessorCallable<I extends WritableComparable, - V extends Writable, E extends Writable> implements Callable<Void> { - /** Class logger */ - private static final Logger LOG = - Logger.getLogger(OutOfCoreProcessorCallable.class); - /** Partition store */ - private final DiskBackedPartitionStore<I, V, E> partitionStore; - /** Adaptive out-of-core engine */ - private final AdaptiveOutOfCoreEngine<I, V, E> oocEngine; - - /** - * Constructor for out-of-core processor threads. - * - * @param oocEngine out-of-core engine - * @param serviceWorker worker service - */ - public OutOfCoreProcessorCallable(AdaptiveOutOfCoreEngine<I, V, E> oocEngine, - CentralizedServiceWorker<I, V, E> serviceWorker) { - this.oocEngine = oocEngine; - this.partitionStore = - (DiskBackedPartitionStore<I, V, E>) serviceWorker.getPartitionStore(); - } - - @Override - public Void call() { - while (true) { - // First wait on a gate to be opened by memory-check thread. Memory-check - // thread opens the gate once there are data available to be spilled to - // disk. - try { - oocEngine.waitOnGate(); - } catch (InterruptedException e) { - throw new IllegalStateException("call: Caught InterruptedException " + - "while waiting on memory check thread signal on available " + - "partitions to put on disk"); - } catch (BrokenBarrierException e) { - throw new IllegalStateException("call Caught BrokenBarrierException. " + - "Looks like some other threads broke while waiting on barrier"); - } - - // The value of 'done' is true iff it is set right before the gate - // at end of check-memory thread. In such case, the computation is done - // and OOC processing threads should terminate gracefully. - if (oocEngine.isDone()) { - break; - } - - BlockingQueue<Integer> partitionsWithInputVertices = - oocEngine.getPartitionsWithInputVertices(); - BlockingQueue<Integer> partitionsWithInputEdges = - oocEngine.getPartitionsWithInputEdges(); - BlockingQueue<Integer> partitionsWithPendingMessages = - oocEngine.getPartitionsWithPendingMessages(); - AtomicInteger numPartitionsToSpill = - oocEngine.getNumPartitionsToSpill(); - - while (!partitionsWithInputVertices.isEmpty()) { - Integer partitionId = partitionsWithInputVertices.poll(); - if (partitionId == null) { - break; - } - if (LOG.isInfoEnabled()) { - LOG.info("call: spilling vertex buffer of partition " + partitionId); - } - try { - partitionStore.spillPartitionInputVertexBuffer(partitionId); - } catch (IOException e) { - throw new IllegalStateException("call: caught IOException while " + - "spilling vertex buffers to disk"); - } - } - - while (!partitionsWithInputEdges.isEmpty()) { - Integer partitionId = partitionsWithInputEdges.poll(); - if (partitionId == null) { - break; - } - if (LOG.isInfoEnabled()) { - LOG.info("call: spilling edge buffer of partition " + partitionId); - } - try { - partitionStore.spillPartitionInputEdgeStore(partitionId); - } catch (IOException e) { - throw new IllegalStateException("call: caught IOException while " + - "spilling edge buffers/store to disk"); - } - } - - while (!partitionsWithPendingMessages.isEmpty()) { - Integer partitionId = partitionsWithPendingMessages.poll(); - if (partitionId == null) { - break; - } - if (LOG.isInfoEnabled()) { - LOG.info( - "call: spilling message buffers of partition " + partitionId); - } - try { - partitionStore.spillPartitionMessages(partitionId); - } catch (IOException e) { - throw new IllegalStateException("call: caught IOException while " + - "spilling edge buffers/store to disk"); - } - } - - // Put partitions on disk - while (numPartitionsToSpill.getAndDecrement() > 0) { - if (LOG.isInfoEnabled()) { - LOG.info("call: start offloading a partition"); - } - partitionStore.spillOnePartition(); - } - - // Signal memory check thread that I am done putting data on disk - try { - oocEngine.waitOnOocSignal(); - } catch (InterruptedException e) { - throw new IllegalStateException("call: Caught InterruptedException " + - "while waiting to notify memory check thread that I am done"); - } catch (BrokenBarrierException e) { - throw new IllegalStateException("call: Caught BrokenBarrierException " + - "while waiting to notify memory check thread that I am done"); - } - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java new file mode 100644 index 0000000..7909100 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.data; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.EdgeStore; +import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.utils.ByteArrayVertexIdEdges; +import org.apache.giraph.utils.VertexIdEdges; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkState; + +/** + * Implementation of an edge-store used for out-of-core mechanism. + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + */ +public class DiskBackedEdgeStore<I extends WritableComparable, + V extends Writable, E extends Writable> + extends OutOfCoreDataManager<VertexIdEdges<I, E>> + implements EdgeStore<I, V, E> { + /** Class logger. */ + private static final Logger LOG = Logger.getLogger(DiskBackedEdgeStore.class); + /** In-memory message store */ + private final EdgeStore<I, V, E> edgeStore; + /** Configuration */ + private final ImmutableClassesGiraphConfiguration<I, V, E> conf; + /** Out-of-core engine */ + private final OutOfCoreEngine oocEngine; + + /** + * Constructor + * + * @param edgeStore In-memory edge store for which out-of-core edge store + * would be a wrapper + * @param conf Configuration + * @param oocEngine Out-of-core engine + */ + public DiskBackedEdgeStore( + EdgeStore<I, V, E> edgeStore, + ImmutableClassesGiraphConfiguration<I, V, E> conf, + OutOfCoreEngine oocEngine) { + super(conf); + this.edgeStore = edgeStore; + this.conf = conf; + this.oocEngine = oocEngine; + } + + @Override + public void addPartitionEdges(int partitionId, VertexIdEdges<I, E> edges) { + addEntry(partitionId, edges); + } + + @Override + public void moveEdgesToVertices() { + edgeStore.moveEdgesToVertices(); + } + + @Override + public void writePartitionEdgeStore(int partitionId, DataOutput output) + throws IOException { + // This method is only called (should only be called) on in-memory edge + // stores + throw new IllegalStateException("writePartitionEdgeStore: this method " + + "should not be called for DiskBackedEdgeStore!"); + } + + @Override + public void readPartitionEdgeStore(int partitionId, DataInput input) + throws IOException { + // This method is only called (should only be called) on in-memory edge + // stores + throw new IllegalStateException("readPartitionEdgeStore: this method " + + "should not be called for DiskBackedEdgeStore!"); + } + + @Override + public boolean hasEdgesForPartition(int partitionId) { + // This method is only called (should only be called) on in-memory edge + // stores + throw new IllegalStateException("hasEdgesForPartition: this method " + + "should not be called for DiskBackedEdgeStore!"); + } + + /** + * Gets the path that should be used specifically for edge data. + * + * @param basePath path prefix to build the actual path from + * @return path to files specific for edge data + */ + private static String getPath(String basePath) { + return basePath + "_edge_store"; + } + + @Override + public void loadPartitionData(int partitionId, String basePath) + throws IOException { + super.loadPartitionData(partitionId, getPath(basePath)); + } + + @Override + public void offloadPartitionData(int partitionId, String basePath) + throws IOException { + super.offloadPartitionData(partitionId, getPath(basePath)); + } + + @Override + public void offloadBuffers(int partitionId, String basePath) + throws IOException { + super.offloadBuffers(partitionId, getPath(basePath)); + } + + @Override + protected void writeEntry(VertexIdEdges<I, E> edges, DataOutput out) + throws IOException { + edges.write(out); + } + + @Override + protected VertexIdEdges<I, E> readNextEntry(DataInput in) throws IOException { + VertexIdEdges<I, E> vertexIdEdges = new ByteArrayVertexIdEdges<>(); + vertexIdEdges.setConf(conf); + vertexIdEdges.readFields(in); + return vertexIdEdges; + } + + @Override + protected void loadInMemoryPartitionData(int partitionId, String path) + throws IOException { + File file = new File(path); + if (file.exists()) { + if (LOG.isDebugEnabled()) { + LOG.debug("loadInMemoryPartitionData: loading edge data for " + + "partition " + partitionId + " from " + file.getAbsolutePath()); + } + FileInputStream fis = new FileInputStream(file); + BufferedInputStream bis = new BufferedInputStream(fis); + DataInputStream dis = new DataInputStream(bis); + edgeStore.readPartitionEdgeStore(partitionId, dis); + dis.close(); + checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " + + "%s.", file.getAbsoluteFile()); + } + } + + @Override + protected void offloadInMemoryPartitionData(int partitionId, String path) + throws IOException { + if (edgeStore.hasEdgesForPartition(partitionId)) { + File file = new File(path); + checkState(!file.exists(), "offloadInMemoryPartitionData: edge store " + + "file %s already exist", file.getAbsoluteFile()); + checkState(file.createNewFile(), + "offloadInMemoryPartitionData: cannot create edge store file %s", + file.getAbsoluteFile()); + FileOutputStream fos = new FileOutputStream(file); + BufferedOutputStream bos = new BufferedOutputStream(fos); + DataOutputStream dos = new DataOutputStream(bos); + edgeStore.writePartitionEdgeStore(partitionId, dos); + dos.close(); + } + } + + @Override + protected int entrySerializedSize(VertexIdEdges<I, E> edges) { + return edges.getSerializedSize(); + } + + @Override + protected void addEntryToImMemoryPartitionData(int partitionId, + VertexIdEdges<I, E> edges) { + oocEngine.getMetaPartitionManager().addPartition(partitionId); + edgeStore.addPartitionEdges(partitionId, edges); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java new file mode 100644 index 0000000..2a40a58 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.data; + +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.utils.ByteArrayOneMessageToManyIds; +import org.apache.giraph.utils.ByteArrayVertexIdMessages; +import org.apache.giraph.utils.VertexIdMessages; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkState; + +/** + * Implementation of a message store used for out-of-core mechanism. + * + * @param <I> Vertex id + * @param <M> Message data + */ +public class DiskBackedMessageStore<I extends WritableComparable, + M extends Writable> extends OutOfCoreDataManager<VertexIdMessages<I, M>> + implements MessageStore<I, M> { + /** Class logger. */ + private static final Logger LOG = + Logger.getLogger(DiskBackedMessageStore.class); + /** Configuration */ + private final ImmutableClassesGiraphConfiguration<I, ?, ?> config; + /** In-memory message store */ + private final MessageStore<I, M> messageStore; + /** Whether the message store uses message combiner or not */ + private final boolean useMessageCombiner; + /** Which superstep this message store is used for */ + private final long superstep; + + /** + * Type of VertexIdMessage class (container for serialized messages) received + * for a particular message. If we write the received messages to disk before + * adding them to message store, we need this type when we want to read the + * messages back from disk (so that we know how to read the messages from + * disk). + */ + private enum SerializedMessageClass { + /** ByteArrayVertexIdMessages */ + BYTE_ARRAY_VERTEX_ID_MESSAGES, + /** ByteArrayOneMEssageToManyIds */ + BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS + } + + /** + * Constructor + * + * @param config Configuration + * @param messageStore In-memory message store for which out-of-core message + * store would be wrapper + * @param useMessageCombiner Whether message combiner is used for this message + * store + * @param superstep superstep number this messages store is used for + */ + public DiskBackedMessageStore(ImmutableClassesGiraphConfiguration<I, ?, ?> + config, + MessageStore<I, M> messageStore, + boolean useMessageCombiner, long superstep) { + super(config); + this.config = config; + this.messageStore = messageStore; + this.useMessageCombiner = useMessageCombiner; + this.superstep = superstep; + } + + @Override + public boolean isPointerListEncoding() { + return messageStore.isPointerListEncoding(); + } + + @Override + public Iterable<M> getVertexMessages(I vertexId) throws IOException { + return messageStore.getVertexMessages(vertexId); + } + + @Override + public void clearVertexMessages(I vertexId) throws IOException { + messageStore.clearVertexMessages(vertexId); + } + + @Override + public void clearAll() throws IOException { + messageStore.clearAll(); + } + + @Override + public boolean hasMessagesForVertex(I vertexId) { + return messageStore.hasMessagesForVertex(vertexId); + } + + @Override + public boolean hasMessagesForPartition(int partitionId) { + return messageStore.hasMessagesForPartition(partitionId); + } + + @Override + public void addPartitionMessages( + int partitionId, VertexIdMessages<I, M> messages) throws IOException { + if (useMessageCombiner) { + messageStore.addPartitionMessages(partitionId, messages); + } else { + addEntry(partitionId, messages); + } + } + + /** + * Gets the path that should be used specifically for message data. + * + * @param basePath path prefix to build the actual path from + * @param superstep superstep for which message data should be stored + * @return path to files specific for message data + */ + private static String getPath(String basePath, long superstep) { + return basePath + "_messages-S" + superstep; + } + + @Override + public void loadPartitionData(int partitionId, String basePath) + throws IOException { + if (!useMessageCombiner) { + super.loadPartitionData(partitionId, getPath(basePath, superstep)); + } + } + + @Override + public void offloadPartitionData(int partitionId, String basePath) + throws IOException { + if (!useMessageCombiner) { + super.offloadPartitionData(partitionId, getPath(basePath, superstep)); + } + } + + @Override + public void offloadBuffers(int partitionId, String basePath) + throws IOException { + if (!useMessageCombiner) { + super.offloadBuffers(partitionId, getPath(basePath, superstep)); + } + } + + @Override + public void finalizeStore() { + messageStore.finalizeStore(); + } + + @Override + public Iterable<I> getPartitionDestinationVertices(int partitionId) { + return messageStore.getPartitionDestinationVertices(partitionId); + } + + @Override + public void clearPartition(int partitionId) throws IOException { + messageStore.clearPartition(partitionId); + } + + @Override + public void writePartition(DataOutput out, int partitionId) + throws IOException { + messageStore.writePartition(out, partitionId); + } + + @Override + public void readFieldsForPartition(DataInput in, int partitionId) + throws IOException { + messageStore.readFieldsForPartition(in, partitionId); + } + + @Override + protected void writeEntry(VertexIdMessages<I, M> messages, DataOutput out) + throws IOException { + SerializedMessageClass messageClass; + if (messages instanceof ByteArrayVertexIdMessages) { + messageClass = SerializedMessageClass.BYTE_ARRAY_VERTEX_ID_MESSAGES; + } else if (messages instanceof ByteArrayOneMessageToManyIds) { + messageClass = SerializedMessageClass.BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS; + } else { + throw new IllegalStateException("writeEntry: serialized message " + + "type is not supported"); + } + out.writeInt(messageClass.ordinal()); + messages.write(out); + } + + @Override + protected VertexIdMessages<I, M> readNextEntry(DataInput in) + throws IOException { + int messageType = in.readInt(); + SerializedMessageClass messageClass = + SerializedMessageClass.values()[messageType]; + VertexIdMessages<I, M> vim; + switch (messageClass) { + case BYTE_ARRAY_VERTEX_ID_MESSAGES: + vim = new ByteArrayVertexIdMessages<>( + config.<M>createOutgoingMessageValueFactory()); + vim.setConf(config); + break; + case BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS: + vim = new ByteArrayOneMessageToManyIds<>( + config.<M>createOutgoingMessageValueFactory()); + vim.setConf(config); + break; + default: + throw new IllegalStateException("readNextEntry: unsupported " + + "serialized message type!"); + } + vim.readFields(in); + return vim; + } + + @Override + protected void loadInMemoryPartitionData(int partitionId, String basePath) + throws IOException { + File file = new File(basePath); + if (file.exists()) { + if (LOG.isDebugEnabled()) { + LOG.debug("loadInMemoryPartitionData: loading message data for " + + "partition " + partitionId + " from " + file.getAbsolutePath()); + } + FileInputStream fis = new FileInputStream(file); + BufferedInputStream bis = new BufferedInputStream(fis); + DataInputStream dis = new DataInputStream(bis); + messageStore.readFieldsForPartition(dis, partitionId); + dis.close(); + checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " + + "%s.", file.getAbsoluteFile()); + } + } + + @Override + protected void offloadInMemoryPartitionData(int partitionId, String basePath) + throws IOException { + if (messageStore.hasMessagesForPartition(partitionId)) { + File file = new File(basePath); + checkState(!file.exists(), "offloadInMemoryPartitionData: message store" + + " file %s already exist", file.getAbsoluteFile()); + checkState(file.createNewFile(), + "offloadInMemoryPartitionData: cannot create message store file %s", + file.getAbsoluteFile()); + FileOutputStream fileout = new FileOutputStream(file); + BufferedOutputStream bufferout = new BufferedOutputStream(fileout); + DataOutputStream outputStream = new DataOutputStream(bufferout); + messageStore.writePartition(outputStream, partitionId); + messageStore.clearPartition(partitionId); + outputStream.close(); + } + } + + @Override + protected int entrySerializedSize(VertexIdMessages<I, M> messages) { + return messages.getSerializedSize(); + } + + @Override + protected void addEntryToImMemoryPartitionData(int partitionId, + VertexIdMessages<I, M> + messages) { + try { + messageStore.addPartitionMessages(partitionId, messages); + } catch (IOException e) { + throw new IllegalStateException("Caught IOException while adding a new " + + "message to in-memory message store"); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java new file mode 100644 index 0000000..5854c8d --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.data; + +import com.google.common.collect.Maps; +import org.apache.giraph.bsp.BspService; +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.OutEdges; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.partition.Partition; +import org.apache.giraph.partition.PartitionStore; +import org.apache.giraph.utils.ExtendedDataOutput; +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.worker.BspServiceWorker; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.log4j.Logger; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * Implementation of a partition-store used for out-of-core mechanism. + * Partition store is responsible for partition data, as well as data buffers in + * INPUT_SUPERSTEP ("raw data buffer" -- defined in OutOfCoreDataManager -- + * refers to vertex buffers in INPUT_SUPERSTEP). + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + */ +public class DiskBackedPartitionStore<I extends WritableComparable, + V extends Writable, E extends Writable> + extends OutOfCoreDataManager<ExtendedDataOutput> + implements PartitionStore<I, V, E> { + /** Class logger. */ + private static final Logger LOG = + Logger.getLogger(DiskBackedPartitionStore.class); + /** Configuration */ + private final ImmutableClassesGiraphConfiguration<I, V, E> conf; + /** Job context (for progress) */ + private final Mapper<?, ?, ?, ?>.Context context; + /** Service worker */ + private final CentralizedServiceWorker<I, V, E> serviceWorker; + /** Out-of-core engine */ + private final OutOfCoreEngine oocEngine; + /** In-memory partition store */ + private final PartitionStore<I, V, E> partitionStore; + /** + * Keeps number of vertices in partitions, right when they are last spilled + * to the disk. This value may be inaccurate for in-memory partitions, but + * is accurate for out-of-core partitions. + */ + private final Map<Integer, Long> partitionVertexCount = + Maps.newConcurrentMap(); + /** + * Keeps number of edges in partitions, right when they are last spilled + * to the disk. This value may be inaccurate for in-memory partitions, but + * is accurate for out-of-core partitions. + */ + private final Map<Integer, Long> partitionEdgeCount = + Maps.newConcurrentMap(); + + /** + * Constructor. + * + * @param partitionStore In-memory partition store for which out-of-code + * partition store would be a wrapper + * @param conf Configuration + * @param context Job context + * @param serviceWorker Service worker + * @param oocEngine Out-of-core engine + */ + public DiskBackedPartitionStore( + PartitionStore<I, V, E> partitionStore, + ImmutableClassesGiraphConfiguration<I, V, E> conf, + Mapper<?, ?, ?, ?>.Context context, + CentralizedServiceWorker<I, V, E> serviceWorker, + OutOfCoreEngine oocEngine) { + super(conf); + this.partitionStore = partitionStore; + this.conf = conf; + this.context = context; + this.serviceWorker = serviceWorker; + this.oocEngine = oocEngine; + } + + @Override + public boolean addPartition(Partition<I, V, E> partition) { + boolean added = partitionStore.addPartition(partition); + if (added) { + oocEngine.getMetaPartitionManager() + .addPartition(partition.getId()); + } + return added; + } + + @Override + public Partition<I, V, E> removePartition(Integer partitionId) { + // Set the partition as 'in process' so its data and messages do not get + // spilled to disk until the remove is complete. + oocEngine.getMetaPartitionManager().makePartitionInaccessible(partitionId); + oocEngine.retrievePartition(partitionId); + Partition<I, V, E> partition = partitionStore.removePartition(partitionId); + checkNotNull(partition, "removePartition: partition " + partitionId + + " is not in memory for removal!"); + oocEngine.getMetaPartitionManager().removePartition(partitionId); + return partition; + } + + @Override + public boolean hasPartition(Integer partitionId) { + return oocEngine.getMetaPartitionManager().hasPartition(partitionId); + } + + @Override + public Iterable<Integer> getPartitionIds() { + return oocEngine.getMetaPartitionManager().getPartitionIds(); + } + + @Override + public int getNumPartitions() { + return oocEngine.getMetaPartitionManager().getNumPartitions(); + } + + @Override + public long getPartitionVertexCount(Integer partitionId) { + if (partitionStore.hasPartition(partitionId)) { + return partitionStore.getPartitionVertexCount(partitionId); + } else { + return partitionVertexCount.get(partitionId); + } + } + + @Override + public long getPartitionEdgeCount(Integer partitionId) { + if (partitionStore.hasPartition(partitionId)) { + return partitionStore.getPartitionEdgeCount(partitionId); + } else { + return partitionEdgeCount.get(partitionId); + } + } + + @Override + public boolean isEmpty() { + return getNumPartitions() == 0; + } + + @Override + public void startIteration() { + oocEngine.startIteration(); + } + + @Override + public Partition<I, V, E> getNextPartition() { + Integer partitionId = oocEngine.getNextPartition(); + if (partitionId == null) { + return null; + } + Partition<I, V, E> partition = partitionStore.removePartition(partitionId); + if (partition == null) { + if (LOG.isInfoEnabled()) { + LOG.info("getNextPartition: partition " + partitionId + " is not in " + + "the partition store. Creating an empty partition for it."); + } + partition = conf.createPartition(partitionId, context); + } + partitionStore.addPartition(partition); + return partition; + } + + @Override + public void putPartition(Partition<I, V, E> partition) { + oocEngine.doneProcessingPartition(partition.getId()); + } + + @Override + public void addPartitionVertices(Integer partitionId, + ExtendedDataOutput extendedDataOutput) { + addEntry(partitionId, extendedDataOutput); + } + + @Override + public void shutdown() { + oocEngine.shutdown(); + } + + @Override + public void initialize() { + oocEngine.initialize(); + } + + /** + * Gets the path that should be used specifically for partition data. + * + * @param basePath path prefix to build the actual path from + * @return path to files specific for partition data + */ + private static String getPath(String basePath) { + return basePath + "_partition"; + } + + /** + * Get the path to the file where vertices are stored. + * + * @param basePath path prefix to build the actual path from + * @return The path to the vertices file + */ + private static String getVerticesPath(String basePath) { + return basePath + "_vertices"; + } + + /** + * Get the path to the file where edges are stored. + * + * @param basePath path prefix to build the actual path from + * @return The path to the edges file + */ + private static String getEdgesPath(String basePath) { + return basePath + "_edges"; + } + + /** + * Read vertex data from an input and initialize the vertex. + * + * @param in The input stream + * @param vertex The vertex to initialize + * @throws IOException + */ + private void readVertexData(DataInput in, Vertex<I, V, E> vertex) + throws IOException { + I id = conf.createVertexId(); + id.readFields(in); + V value = conf.createVertexValue(); + value.readFields(in); + OutEdges<I, E> edges = conf.createAndInitializeOutEdges(0); + vertex.initialize(id, value, edges); + if (in.readBoolean()) { + vertex.voteToHalt(); + } else { + vertex.wakeUp(); + } + } + + /** + * Read vertex edges from an input and set them to the vertex. + * + * @param in The input stream + * @param partition The partition owning the vertex + * @throws IOException + */ + private void readOutEdges(DataInput in, Partition<I, V, E> partition) + throws IOException { + I id = conf.createVertexId(); + id.readFields(in); + Vertex<I, V, E> v = partition.getVertex(id); + OutEdges<I, E> edges = (OutEdges<I, E>) v.getEdges(); + edges.readFields(in); + partition.saveVertex(v); + } + + @Override + protected void loadInMemoryPartitionData(int partitionId, String path) + throws IOException { + // Load vertices + File file = new File(getVerticesPath(path)); + if (file.exists()) { + Partition<I, V, E> partition = conf.createPartition(partitionId, context); + if (LOG.isDebugEnabled()) { + LOG.debug("loadInMemoryPartitionData: loading partition vertices " + + partitionId + " from " + file.getAbsolutePath()); + } + + FileInputStream fis = new FileInputStream(file); + BufferedInputStream bis = new BufferedInputStream(fis); + DataInputStream inputStream = new DataInputStream(bis); + long numVertices = inputStream.readLong(); + for (long i = 0; i < numVertices; ++i) { + Vertex<I, V, E> vertex = conf.createVertex(); + readVertexData(inputStream, vertex); + partition.putVertex(vertex); + } + inputStream.close(); + checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " + + "%s", file.getAbsolutePath()); + + // Load edges + file = new File(getEdgesPath(path)); + if (LOG.isDebugEnabled()) { + LOG.debug("loadInMemoryPartitionData: loading partition edges " + + partitionId + " from " + file.getAbsolutePath()); + } + + fis = new FileInputStream(file); + bis = new BufferedInputStream(fis); + inputStream = new DataInputStream(bis); + for (int i = 0; i < numVertices; ++i) { + readOutEdges(inputStream, partition); + } + inputStream.close(); + // If the graph is static and it is not INPUT_SUPERSTEP, keep the file + // around. + if (!conf.isStaticGraph() || + serviceWorker.getSuperstep() == BspService.INPUT_SUPERSTEP) { + checkState(file.delete(), "loadPartition: failed to delete %s", + file.getAbsolutePath()); + } + partitionStore.addPartition(partition); + } + } + + @Override + protected ExtendedDataOutput readNextEntry(DataInput in) throws IOException { + return WritableUtils.readExtendedDataOutput(in, conf); + } + + @Override + protected void addEntryToImMemoryPartitionData(int partitionId, + ExtendedDataOutput vertices) { + if (!partitionStore.hasPartition(partitionId)) { + oocEngine.getMetaPartitionManager().addPartition(partitionId); + } + partitionStore.addPartitionVertices(partitionId, vertices); + } + + @Override + public void loadPartitionData(int partitionId, String basePath) + throws IOException { + super.loadPartitionData(partitionId, getPath(basePath)); + } + + @Override + public void offloadPartitionData(int partitionId, String basePath) + throws IOException { + super.offloadPartitionData(partitionId, getPath(basePath)); + } + + /** + * Writes vertex data (Id, value and halted state) to stream. + * + * @param output The output stream + * @param vertex The vertex to serialize + * @throws IOException + */ + private void writeVertexData(DataOutput output, Vertex<I, V, E> vertex) + throws IOException { + vertex.getId().write(output); + vertex.getValue().write(output); + output.writeBoolean(vertex.isHalted()); + } + + /** + * Writes vertex edges (Id, edges) to stream. + * + * @param output The output stream + * @param vertex The vertex to serialize + * @throws IOException + */ + private void writeOutEdges(DataOutput output, Vertex<I, V, E> vertex) + throws IOException { + vertex.getId().write(output); + OutEdges<I, E> edges = (OutEdges<I, E>) vertex.getEdges(); + edges.write(output); + } + + @Override + protected void offloadInMemoryPartitionData(int partitionId, String path) + throws IOException { + if (partitionStore.hasPartition(partitionId)) { + partitionVertexCount.put(partitionId, + partitionStore.getPartitionVertexCount(partitionId)); + partitionEdgeCount.put(partitionId, + partitionStore.getPartitionEdgeCount(partitionId)); + Partition<I, V, E> partition = + partitionStore.removePartition(partitionId); + File file = new File(getVerticesPath(path)); + if (LOG.isDebugEnabled()) { + LOG.debug("offloadInMemoryPartitionData: writing partition vertices " + + partitionId + " to " + file.getAbsolutePath()); + } + checkState(!file.exists(), "offloadInMemoryPartitionData: partition " + + "store file %s already exist", file.getAbsoluteFile()); + checkState(file.createNewFile(), + "offloadInMemoryPartitionData: file %s already exists.", + file.getAbsolutePath()); + + FileOutputStream fileout = new FileOutputStream(file); + BufferedOutputStream bufferout = new BufferedOutputStream(fileout); + DataOutputStream outputStream = new DataOutputStream(bufferout); + outputStream.writeLong(partition.getVertexCount()); + for (Vertex<I, V, E> vertex : partition) { + writeVertexData(outputStream, vertex); + } + outputStream.close(); + + // Avoid writing back edges if we have already written them once and + // the graph is not changing. + // If we are in the input superstep, we need to write the files + // at least the first time, even though the graph is static. + file = new File(getEdgesPath(path)); + if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP || + partitionVertexCount.get(partitionId) == null || + partitionVertexCount.get(partitionId) != partition.getVertexCount() || + !conf.isStaticGraph() || !file.exists()) { + checkState(file.createNewFile(), "offloadInMemoryPartitionData: file " + + "%s already exists.", file.getAbsolutePath()); + if (LOG.isDebugEnabled()) { + LOG.debug("offloadInMemoryPartitionData: writing partition edges " + + partitionId + " to " + file.getAbsolutePath()); + } + fileout = new FileOutputStream(file); + bufferout = new BufferedOutputStream(fileout); + outputStream = new DataOutputStream(bufferout); + for (Vertex<I, V, E> vertex : partition) { + writeOutEdges(outputStream, vertex); + } + outputStream.close(); + } + } + } + + @Override + protected void writeEntry(ExtendedDataOutput vertices, DataOutput out) + throws IOException { + WritableUtils.writeExtendedDataOutput(vertices, out); + } + + @Override + public void offloadBuffers(int partitionId, String basePath) + throws IOException { + super.offloadBuffers(partitionId, getPath(basePath)); + } + + @Override + protected int entrySerializedSize(ExtendedDataOutput vertices) { + return vertices.getPos(); + } +}
