http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java deleted file mode 100644 index ce24fe2..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc.io; - -import com.google.common.base.Preconditions; -import org.apache.giraph.bsp.BspService; -import org.apache.giraph.comm.messages.MessageStore; -import org.apache.giraph.ooc.OutOfCoreEngine; -import org.apache.giraph.ooc.data.DiskBackedEdgeStore; -import org.apache.giraph.ooc.data.DiskBackedMessageStore; -import org.apache.giraph.ooc.data.DiskBackedPartitionStore; - -import java.io.IOException; - -/** - * IOCommand to load partition data, edge data (if in INPUT_SUPERSTEP), and - * message data (if in compute supersteps). Also, this command can be used to - * prefetch a partition to be processed in the next superstep. - */ -public class LoadPartitionIOCommand extends IOCommand { - /** - * Which superstep this partition should be loaded for? (can be current - * superstep or next superstep -- in case of prefetching). - */ - private final long superstep; - - /** - * Constructor - * - * @param oocEngine out-of-core engine - * @param partitionId id of the partition to be loaded - * @param superstep superstep to load the partition for - */ - public LoadPartitionIOCommand(OutOfCoreEngine oocEngine, int partitionId, - long superstep) { - super(oocEngine, partitionId); - this.superstep = superstep; - } - - @Override - public boolean execute(String basePath) throws IOException { - boolean executed = false; - if (oocEngine.getMetaPartitionManager() - .startLoadingPartition(partitionId, superstep)) { - long currentSuperstep = oocEngine.getSuperstep(); - DiskBackedPartitionStore partitionStore = - (DiskBackedPartitionStore) - oocEngine.getServerData().getPartitionStore(); - numBytesTransferred += - partitionStore.loadPartitionData(partitionId, basePath); - if (currentSuperstep == BspService.INPUT_SUPERSTEP && - superstep == currentSuperstep) { - DiskBackedEdgeStore edgeStore = - (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore(); - numBytesTransferred += - edgeStore.loadPartitionData(partitionId, basePath); - } - MessageStore messageStore; - if (currentSuperstep == superstep) { - messageStore = oocEngine.getServerData().getCurrentMessageStore(); - } else { - Preconditions.checkState(superstep == currentSuperstep + 1); - messageStore = oocEngine.getServerData().getIncomingMessageStore(); - } - if (messageStore != null) { - numBytesTransferred += ((DiskBackedMessageStore) messageStore) - .loadPartitionData(partitionId, basePath); - } - oocEngine.getMetaPartitionManager() - .doneLoadingPartition(partitionId, superstep); - executed = true; - } - return executed; - } - - @Override - public IOCommandType getType() { - return IOCommandType.LOAD_PARTITION; - } - - @Override - public String toString() { - return "LoadPartitionIOCommand: (partitionId = " + partitionId + ", " + - "superstep = " + superstep + ")"; - } -}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java deleted file mode 100644 index f1769dd..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc.io; - -import org.apache.giraph.ooc.OutOfCoreEngine; -import org.apache.giraph.ooc.data.DiskBackedEdgeStore; -import org.apache.giraph.ooc.data.DiskBackedMessageStore; -import org.apache.giraph.ooc.data.DiskBackedPartitionStore; - -import java.io.IOException; - -/** - * IOCommand to store raw data buffers on disk. - */ -public class StoreDataBufferIOCommand extends IOCommand { - /** - * Types of raw data buffer to offload to disk (either vertices/edges buffer - * in INPUT_SUPERSTEP or incoming message buffer). - */ - public enum DataBufferType { PARTITION, MESSAGE }; - /** - * Type of the buffer to store on disk. - */ - private final DataBufferType type; - - /** - * Constructor - * - * @param oocEngine out-of-core engine - * @param partitionId id of the partition to offload its buffers - * @param type type of the buffer to store on disk - */ - public StoreDataBufferIOCommand(OutOfCoreEngine oocEngine, - int partitionId, - DataBufferType type) { - super(oocEngine, partitionId); - this.type = type; - } - - @Override - public boolean execute(String basePath) throws IOException { - boolean executed = false; - if (oocEngine.getMetaPartitionManager() - .startOffloadingBuffer(partitionId)) { - switch (type) { - case PARTITION: - DiskBackedPartitionStore partitionStore = - (DiskBackedPartitionStore) - oocEngine.getServerData().getPartitionStore(); - numBytesTransferred += - partitionStore.offloadBuffers(partitionId, basePath); - DiskBackedEdgeStore edgeStore = - (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore(); - numBytesTransferred += edgeStore.offloadBuffers(partitionId, basePath); - break; - case MESSAGE: - DiskBackedMessageStore messageStore = - (DiskBackedMessageStore) - oocEngine.getServerData().getIncomingMessageStore(); - numBytesTransferred += - messageStore.offloadBuffers(partitionId, basePath); - break; - default: - throw new IllegalStateException("execute: requested data buffer type " + - "does not exist!"); - } - oocEngine.getMetaPartitionManager().doneOffloadingBuffer(partitionId); - executed = true; - } - return executed; - } - - @Override - public IOCommandType getType() { - return IOCommandType.STORE_BUFFER; - } - - @Override - public String toString() { - return "StoreDataBufferIOCommand: (partitionId = " + partitionId + ", " + - "type = " + type.name() + ")"; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java deleted file mode 100644 index c9d8829..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc.io; - -import org.apache.giraph.ooc.OutOfCoreEngine; -import org.apache.giraph.ooc.data.DiskBackedMessageStore; - -import java.io.IOException; - -import static com.google.common.base.Preconditions.checkState; - -/** - * IOCommand to store incoming message of a particular partition. - */ -public class StoreIncomingMessageIOCommand extends IOCommand { - /** - * Constructor - * - * @param oocEngine out-of-core engine - * @param partitionId id of the partition to store its incoming messages - */ - public StoreIncomingMessageIOCommand(OutOfCoreEngine oocEngine, - int partitionId) { - super(oocEngine, partitionId); - } - - @Override - public boolean execute(String basePath) throws IOException { - boolean executed = false; - if (oocEngine.getMetaPartitionManager() - .startOffloadingMessages(partitionId)) { - DiskBackedMessageStore messageStore = - (DiskBackedMessageStore) - oocEngine.getServerData().getIncomingMessageStore(); - checkState(messageStore != null); - numBytesTransferred += - messageStore.offloadPartitionData(partitionId, basePath); - oocEngine.getMetaPartitionManager().doneOffloadingMessages(partitionId); - executed = true; - } - return executed; - } - - @Override - public IOCommandType getType() { - return IOCommandType.STORE_MESSAGE; - } - - @Override - public String toString() { - return "StoreIncomingMessageIOCommand: (partitionId = " + partitionId + ")"; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java deleted file mode 100644 index 797ac9d..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc.io; - -import org.apache.giraph.bsp.BspService; -import org.apache.giraph.comm.messages.MessageStore; -import org.apache.giraph.ooc.data.DiskBackedEdgeStore; -import org.apache.giraph.ooc.data.DiskBackedMessageStore; -import org.apache.giraph.ooc.data.DiskBackedPartitionStore; -import org.apache.giraph.ooc.OutOfCoreEngine; - -import java.io.IOException; - -/** - * IOCommand to store partition data, edge data (if in INPUT_SUPERSTEP), and - * message data (if in compute supersteps). - */ -public class StorePartitionIOCommand extends IOCommand { - /** - * Constructor - * - * @param oocEngine out-of-core engine - * @param partitionId id of the partition to store its data - */ - public StorePartitionIOCommand(OutOfCoreEngine oocEngine, - int partitionId) { - super(oocEngine, partitionId); - } - - @Override - public boolean execute(String basePath) throws IOException { - boolean executed = false; - if (oocEngine.getMetaPartitionManager() - .startOffloadingPartition(partitionId)) { - DiskBackedPartitionStore partitionStore = - (DiskBackedPartitionStore) - oocEngine.getServerData().getPartitionStore(); - numBytesTransferred += - partitionStore.offloadPartitionData(partitionId, basePath); - if (oocEngine.getSuperstep() != BspService.INPUT_SUPERSTEP) { - MessageStore messageStore = - oocEngine.getServerData().getCurrentMessageStore(); - if (messageStore != null) { - numBytesTransferred += ((DiskBackedMessageStore) messageStore) - .offloadPartitionData(partitionId, basePath); - } - } else { - DiskBackedEdgeStore edgeStore = - (DiskBackedEdgeStore) - oocEngine.getServerData().getEdgeStore(); - numBytesTransferred += - edgeStore.offloadPartitionData(partitionId, basePath); - } - oocEngine.getMetaPartitionManager().doneOffloadingPartition(partitionId); - executed = true; - } - return executed; - } - - @Override - public IOCommandType getType() { - return IOCommandType.STORE_PARTITION; - } - - @Override - public String toString() { - return "StorePartitionIOCommand: (partitionId = " + partitionId + ")"; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java deleted file mode 100644 index 74e72eb..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc.io; - -import org.apache.giraph.ooc.OutOfCoreEngine; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -/** - * IOCommand to do nothing regarding moving data to/from disk. - */ -public class WaitIOCommand extends IOCommand { - /** How long should the disk be idle? (in milliseconds) */ - private final long waitDuration; - - /** - * Constructor - * - * @param oocEngine out-of-core engine - * @param waitDuration duration of wait - */ - public WaitIOCommand(OutOfCoreEngine oocEngine, long waitDuration) { - super(oocEngine, -1); - this.waitDuration = waitDuration; - } - - @Override - public boolean execute(String basePath) throws IOException { - try { - TimeUnit.MILLISECONDS.sleep(waitDuration); - } catch (InterruptedException e) { - throw new IllegalStateException("execute: caught InterruptedException " + - "while IO thread is waiting!"); - } - return true; - } - - @Override - public IOCommandType getType() { - return IOCommandType.WAIT; - } - - @Override - public String toString() { - return "WaitIOCommand: (duration = " + waitDuration + "ms)"; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java deleted file mode 100644 index 2230ec4..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Package of classes related to IO operations in out-of-core mechanism - */ -package org.apache.giraph.ooc.io; http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/DataIndex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/DataIndex.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/DataIndex.java new file mode 100644 index 0000000..d44204b --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/DataIndex.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.persistence; + +import java.util.ArrayList; +import java.util.List; + +/** + * Index chain used in out-of-core data accessor object (DAO) to access + * serialized data. + */ +public class DataIndex { + /** Chain of data indices */ + private final List<DataIndexEntry> indexList = new ArrayList<>(5); + + /** + * Add an index to the index chain + * + * @param entry the entry to add to the chain + * @return the index chain itself + */ + public DataIndex addIndex(DataIndexEntry entry) { + indexList.add(entry); + return this; + } + + /** + * Remove/Pop the last index in the index chain + * + * @return the index chain itself + */ + public DataIndex removeLastIndex() { + indexList.remove(indexList.size() - 1); + return this; + } + + /** + * Create a copy of the existing DataIndex + * + * @return a copy of the existing index chain + */ + public DataIndex copy() { + DataIndex index = new DataIndex(); + for (DataIndexEntry entry : indexList) { + index.indexList.add(entry); + } + return index; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof DataIndex)) { + return false; + } + DataIndex dataIndex = (DataIndex) obj; + return indexList.equals(dataIndex.indexList); + } + + @Override + public int hashCode() { + return indexList.hashCode(); + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + for (DataIndexEntry entry : indexList) { + sb.append(entry); + } + return sb.toString(); + } + + /** Interface to unify different types of entries used as index chain */ + public interface DataIndexEntry { } + + /** + * Different static types of index chain entry + */ + public enum TypeIndexEntry implements DataIndexEntry { + /** The whole partition */ + PARTITION("_partition"), + /** Partition vertices */ + PARTITION_VERTICES("_vertices"), + /** Partition edges */ + PARTITION_EDGES("_edges"), + /** Partition messages */ + MESSAGE("_messages"), + /** Edges stored in edge store for a partition */ + EDGE_STORE("_edge_store"), + /** Raw data buffer (refer to DiskBackedDataStore) */ + BUFFER("_buffer"); + + /** String realization of entry type */ + private final String name; + + /** + * Constructor + * + * @param name name of the type + */ + TypeIndexEntry(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + @Override + public String toString() { + return name; + } + } + + /** + * Class representing any index chain that depends on something with id. + * Generally this is used for identifying indices in two types: + * - Index entry based on superstep id ('S' and the superstep number) + * - Index entry based on partition id ('P' and the partition id) + */ + public static final class NumericIndexEntry implements DataIndexEntry { + /** Type of index */ + private final char type; + /** Id of the index associated with the specified type */ + private final long id; + + /** + * Constructor + * + * @param type type of index (for now 'S' for superstep, or 'P' for + * partition) + * @param id id of the index associated with the given type + */ + private NumericIndexEntry(char type, long id) { + this.type = type; + this.id = id; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof NumericIndexEntry)) { + return false; + } + NumericIndexEntry index = (NumericIndexEntry) obj; + return index.type == type && index.id == id; + } + + @Override + public int hashCode() { + int result = 17; + result = result * 37 + type; + result = result * 37 + (int) id; + result = result * 37 + (int) (id >> 32); + return result; + } + + @Override + public String toString() { + return String.format("_%c%d", type, id); + } + + /** + * Create a data index entry for a given partition + * + * @param partitionId id of the partition + * @return data index entry for a given partition + */ + public static NumericIndexEntry createPartitionEntry(int partitionId) { + return new NumericIndexEntry('P', partitionId); + } + + /** + * Create a data index entry for a given superstep + * + * @param superstep the superstep number + * @return data index entry for a given superstep + */ + public static NumericIndexEntry createSuperstepEntry(long superstep) { + return new NumericIndexEntry('S', superstep); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java new file mode 100644 index 0000000..2e42906 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.persistence; + +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.KryoDataInput; +import com.esotericsoftware.kryo.io.KryoDataOutput; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.io.UnsafeInput; +import com.esotericsoftware.kryo.io.UnsafeOutput; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.IntConfOption; +import org.apache.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; + +import static com.google.common.base.Preconditions.checkState; +import static org.apache.giraph.conf.GiraphConstants.ONE_MB; + +/** + * Data accessor object to read/write data in local disk. + * Note: This class assumes that the data are partitioned across IO threads, + * i.e. each part of data can be accessed by one and only one IO thread + * throughout the execution. Also, each IO thread reads a particular + * type of data completely and, only then, it can read other type of data; + * i.e. an IO thread cannot be used to read two different files at the + * same time. These assumptions are based on the assumptions that the + * current out-of-core mechanism is designed for. + */ +public class LocalDiskDataAccessor implements OutOfCoreDataAccessor { + /** + * Size of the buffer used for (de)serializing data when reading/writing + * from/to disk + */ + public static final IntConfOption OOC_DISK_BUFFER_SIZE = + new IntConfOption("graph.oocDiskBufferSize", 4 * ONE_MB, + "size of the buffer when (de)serializing data for reading/writing " + + "from/to disk"); + + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(LocalDiskDataAccessor.class); + /** + * In-memory buffer used for (de)serializing data when reading/writing + * from/to disk using Kryo + */ + private final byte[][] perThreadBuffers; + /** Path prefix for different disks */ + private final String[] basePaths; + /** How many disks (i.e. IO threads) do we have? */ + private final int numDisks; + + /** + * Constructor + * + * @param conf Configuration + */ + public LocalDiskDataAccessor( + ImmutableClassesGiraphConfiguration<?, ?, ?> conf) { + // Take advantage of multiple disks + String[] userPaths = GiraphConstants.PARTITIONS_DIRECTORY.getArray(conf); + this.numDisks = userPaths.length; + if (!GiraphConstants.NUM_OUT_OF_CORE_THREADS.isDefaultValue(conf) || + GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(conf) != numDisks) { + LOG.warn("LocalDiskDataAccessor: with this data accessor, number of " + + "out-of-core threads is only specified by the number of " + + "directories given by 'giraph.partitionsDirectory' flag! Now using " + + numDisks + " IO threads!"); + } + this.basePaths = new String[numDisks]; + int ptr = 0; + String jobId = conf.getJobId(); + for (String path : userPaths) { + File file = new File(path); + if (!file.exists()) { + checkState(file.mkdirs(), "LocalDiskDataAccessor: cannot create " + + "directory " + file.getAbsolutePath()); + } + basePaths[ptr] = path + "/" + jobId; + ptr++; + } + final int diskBufferSize = OOC_DISK_BUFFER_SIZE.get(conf); + this.perThreadBuffers = new byte[numDisks][diskBufferSize]; + } + + @Override + public void initialize() { } + + @Override + public void shutdown() { + for (String path : basePaths) { + File file = new File(path).getParentFile(); + for (String subFileName : file.list()) { + File subFile = new File(file.getPath(), subFileName); + checkState(subFile.delete(), "shutdown: cannot delete file %s", + subFile.getAbsoluteFile()); + } + checkState(file.delete(), "shutdown: cannot delete directory %s", + file.getAbsoluteFile()); + } + } + + @Override + public int getNumAccessorThreads() { + return numDisks; + } + + @Override + public DataInputWrapper prepareInput(int threadId, DataIndex index) + throws IOException { + return new LocalDiskDataInputWrapper(basePaths[threadId] + index.toString(), + perThreadBuffers[threadId]); + } + + @Override + public DataOutputWrapper prepareOutput( + int threadId, DataIndex index, boolean shouldAppend) throws IOException { + return new LocalDiskDataOutputWrapper( + basePaths[threadId] + index.toString(), shouldAppend, + perThreadBuffers[threadId]); + } + + @Override + public boolean dataExist(int threadId, DataIndex index) { + return new File(basePaths[threadId] + index.toString()).exists(); + } + + /** Implementation of <code>DataInput</code> wrapper for local disk reader */ + private static class LocalDiskDataInputWrapper implements DataInputWrapper { + /** File used to read the data from */ + private final File file; + /** Kryo's handle to read the data */ + private final Input input; + + /** + * Constructor + * + * @param fileName file name + * @param buffer reusable byte buffer that will be used in Kryo's Input + * reader + * @throws IOException + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings( + "OBL_UNSATISFIED_OBLIGATION") + LocalDiskDataInputWrapper(String fileName, byte[] buffer) + throws IOException { + file = new File(fileName); + LOG.info("LocalDiskDataInputWrapper: obtaining a data input from local " + + "file " + file.getAbsolutePath()); + if (LOG.isDebugEnabled()) { + LOG.debug("LocalDiskDataInputWrapper: obtaining a data input from " + + "local file " + file.getAbsolutePath()); + } + input = new UnsafeInput(buffer); + input.setInputStream(new FileInputStream( + new RandomAccessFile(file, "r").getFD())); + } + + @Override + public DataInput getDataInput() { + return new KryoDataInput(input); + } + + @Override + public long finalizeInput(boolean deleteOnClose) { + input.close(); + long count = input.total(); + checkState(!deleteOnClose || file.delete(), + "finalizeInput: failed to delete %s.", file.getAbsoluteFile()); + return count; + } + } + + /** Implementation of <code>DataOutput</code> wrapper for local disk writer */ + private static class LocalDiskDataOutputWrapper implements DataOutputWrapper { + /** File used to write the data to */ + private final File file; + /** Kryo's handle to write the date */ + private final Output output; + + /** + * Constructor + * + * @param fileName file name + * @param shouldAppend whether the <code>DataOutput</code> should be used + * for appending to already existing files + * @param buffer reusable byte buffer that will be used in Kryo's Output + * writer + * @throws IOException + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings( + "OBL_UNSATISFIED_OBLIGATION") + LocalDiskDataOutputWrapper(String fileName, boolean shouldAppend, + byte[] buffer) throws IOException { + file = new File(fileName); + LOG.info("LocalDiskDataOutputWrapper: obtaining a data output from " + + "local file " + file.getAbsolutePath()); + if (LOG.isDebugEnabled()) { + LOG.debug("LocalDiskDataOutputWrapper: obtaining a data output from " + + "local file " + file.getAbsolutePath()); + if (!shouldAppend) { + checkState(!file.exists(), "LocalDiskDataOutputWrapper: file %s " + + "already exist", file.getAbsoluteFile()); + checkState(file.createNewFile(), "LocalDiskDataOutputWrapper: " + + "cannot create file %s", file.getAbsolutePath()); + } + } + output = new UnsafeOutput(buffer); + RandomAccessFile raf = new RandomAccessFile(file, "rw"); + if (shouldAppend) { + raf.seek(file.length()); + } + output.setOutputStream(new FileOutputStream(raf.getFD())); + } + + @Override + public DataOutput getDataOutput() { + return new KryoDataOutput(output); + } + + + @Override + public long finalizeOutput() { + output.close(); + long count = output.total(); + return count; + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java new file mode 100644 index 0000000..d4ddc62 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.persistence; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Interface representing data accessor object (DAO) used as persistence layer + * in out-of-core mechanism. + * Note: any class implementing this interface should have one and only one + * constructor taking one and only one argument of type + * <code>ImmutableClassesGiraphConfiguration</code> + */ +public interface OutOfCoreDataAccessor { + /** Initialize the DAO */ + void initialize(); + + /** Shut down the DAO */ + void shutdown(); + + /** + * @return the number of threads involved in data persistence + */ + int getNumAccessorThreads(); + + /** + * Prepare a wrapper containing <code>DataInput</code> representation for a + * given thread involved in persistence for a given index chain for data. + * + * @param threadId id of the thread involved in persistence + * @param index index chain of the data to access the serialized data form + * @return the wrapper for <code>DataInput</code> representation of data + * @throws IOException + */ + DataInputWrapper prepareInput(int threadId, DataIndex index) + throws IOException; + + /** + * Prepare a wrapper containing <code>DataOutput</code> representation for a + * given thread involved in persistence for a given index chain for data. + * + * @param threadId id of the thread involved in persistence + * @param index index chain of the data to access the serialized data form + * @param shouldAppend whether the <code>DataOutput</code> should be used for + * appending to already existing data for the given index + * or the <code>DataOutput</code> should create new + * instance to store serialized data + * @return the wrapper for <code>DataOutput</code> representation of data + * @throws IOException + */ + DataOutputWrapper prepareOutput(int threadId, DataIndex index, + boolean shouldAppend) throws IOException; + + /** + * Whether the data for the given thread and index chain exists? + * + * @param threadId id of the thread involved in persistence + * @param index index chain used to access the data + * @return True if the data exists for the given index chain for the given + * thread, False otherwise + */ + boolean dataExist(int threadId, DataIndex index); + + /** Interface to wrap <code>DataInput</code> */ + interface DataInputWrapper { + /** + * @return the <code>DataInput</code> + */ + DataInput getDataInput(); + + /** + * Finalize and close the <code>DataInput</code> used for persistence. + * + * @param deleteOnClose whether the source of <code>DataInput</code> + * should be deleted on closing/finalizing + * @return number of bytes read from <code>DataInput</code> since it was + * opened + */ + long finalizeInput(boolean deleteOnClose); + } + + /** Interface to warp <code>DataOutput</code> */ + interface DataOutputWrapper { + /** + * @return the <code>DataOutput</code> + */ + DataOutput getDataOutput(); + + /** + * Finalize and close the <code>DataOutput</code> used for persistence. + * + * @return number of bytes written to <code>DataOutput</code> since it was + * opened + */ + long finalizeOutput(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/package-info.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/package-info.java new file mode 100644 index 0000000..adf8dba --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package of classes related to IO abstraction or persistence layer used for + * out-of-core mechanism + */ +package org.apache.giraph.ooc.persistence; http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java new file mode 100644 index 0000000..ffc5f7f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.policy; + +import com.sun.management.GarbageCollectionNotificationInfo; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.ooc.command.IOCommand; +import org.apache.giraph.ooc.command.LoadPartitionIOCommand; +import org.apache.giraph.ooc.command.StorePartitionIOCommand; +import org.apache.log4j.Logger; + +import java.util.concurrent.atomic.AtomicInteger; + +/** Oracle for fixed out-of-core mechanism */ +public class FixedPartitionsOracle implements OutOfCoreOracle { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(FixedPartitionsOracle.class); + /** Maximum number of partitions to be kept in memory */ + private final int maxPartitionsInMemory; + /** + * Number of partitions to be added (loaded) or removed (stored) to/from + * memory. Each outstanding load partition counts +1 and each outstanding + * store partition counts -1 toward this counter. + */ + private final AtomicInteger deltaNumPartitionsInMemory = + new AtomicInteger(0); + /** Out-of-core engine */ + private final OutOfCoreEngine oocEngine; + + /** + * Constructor + * + * @param conf configuration + * @param oocEngine out-of-core engine + */ + public FixedPartitionsOracle(ImmutableClassesGiraphConfiguration conf, + OutOfCoreEngine oocEngine) { + this.maxPartitionsInMemory = + GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf); + this.oocEngine = oocEngine; + } + + @Override + public IOAction[] getNextIOActions() { + int numPartitionsInMemory = + oocEngine.getMetaPartitionManager().getNumInMemoryPartitions(); + if (LOG.isInfoEnabled()) { + LOG.info("getNextIOActions: calling with " + numPartitionsInMemory + + " partitions in memory, " + deltaNumPartitionsInMemory.get() + + " to be loaded"); + } + int numPartitions = + numPartitionsInMemory + deltaNumPartitionsInMemory.get(); + // Fixed out-of-core policy: + // - if the number of partitions in memory is less than the max number of + // partitions in memory, we should load a partition to memory. This + // basically means we are prefetching partition's data either for the + // current superstep, or for the next superstep. + // - if the number of partitions in memory is equal to the the max number + // of partitions in memory, we do a 'soft store', meaning, we store + // processed partition to disk only if there is an unprocessed partition + // on disk. This basically makes room for unprocessed partitions on disk + // to be prefetched. + // - if the number of partitions in memory is more than the max number of + // partitions in memory, we do a 'hard store', meaning we store a + // partition to disk, regardless of its processing state. + if (numPartitions < maxPartitionsInMemory) { + return new IOAction[]{ + IOAction.LOAD_PARTITION, + IOAction.STORE_MESSAGES_AND_BUFFERS}; + } else if (numPartitions > maxPartitionsInMemory) { + LOG.warn("getNextIOActions: number of partitions in memory passed the " + + "specified threshold!"); + return new IOAction[]{ + IOAction.STORE_PARTITION, + IOAction.STORE_MESSAGES_AND_BUFFERS}; + } else { + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.LOAD_TO_SWAP_PARTITION}; + } + } + + @Override + public boolean approve(IOCommand command) { + int numPartitionsInMemory = oocEngine.getMetaPartitionManager() + .getNumInMemoryPartitions(); + // If loading a partition result in having more partition in memory, the + // command should be denied. Also, if number of partitions in memory is + // already less than the max number of partitions, any command for storing + // a partition should be denied. + if (command instanceof LoadPartitionIOCommand && + numPartitionsInMemory + deltaNumPartitionsInMemory.getAndIncrement() > + maxPartitionsInMemory) { + deltaNumPartitionsInMemory.getAndDecrement(); + return false; + + } else if (command instanceof StorePartitionIOCommand && + numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() < + maxPartitionsInMemory) { + deltaNumPartitionsInMemory.getAndIncrement(); + return false; + } + return true; + } + + @Override + public void commandCompleted(IOCommand command) { + if (command instanceof LoadPartitionIOCommand) { + deltaNumPartitionsInMemory.getAndDecrement(); + } else if (command instanceof StorePartitionIOCommand) { + deltaNumPartitionsInMemory.getAndIncrement(); + } + } + + @Override + public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { } + + @Override + public void shutdown() { } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java new file mode 100644 index 0000000..45b9914 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.policy; + +import com.sun.management.GarbageCollectionNotificationInfo; +import org.apache.giraph.ooc.command.IOCommand; + +/** + * Interface for any out-of-core oracle. An out-of-core oracle is the brain of + * the out-of-core mechanism, determining/deciding on out-of-core actions (load + * or store) that should happen. + * Note: any class implementing this interface should have one and only one + * constructor taking only two arguments of types + * <code>ImmutableClassesGiraphConfiguration</code> and + * <code>OutOfCoreEngine</code> + */ +public interface OutOfCoreOracle { + /** + * Different types of IO actions that can potentially lead to a more desired + * state of computation for out-of-core mechanism. These actions are issued + * based on the status of the memory (memory pressure, rate of data transfer + * to memory, etc.) + */ + enum IOAction { + /** + * Either of: + * - storing incoming messages of any partition currently on disk, or + * - storing incoming messages' raw data buffer of any partition + * currently on disk, or + * - storing partitions' raw data buffer for those partitions that are + * currently on disk. + */ + STORE_MESSAGES_AND_BUFFERS, + /** + * Storing a partition that is *processed* in the current iteration cycle. + * This action is also known as "soft store" + */ + STORE_PROCESSED_PARTITION, + /** + * Storing a partition from memory on disk, prioritizing to *processed* + * partitions on memory. However, if there is no *processed* partition, + * store should happen at any cost, even if an *unprocessed* partition has + * to be stored. This action is also know as "hard store". + */ + STORE_PARTITION, + /** + * Loading an *unprocessed* partition from disk to memory, only if there are + * *processed* partitions in memory. This action basically initiates a swap + * operation. + */ + LOAD_TO_SWAP_PARTITION, + /** + * Loading an *unprocessed* partition from disk to memory. This action is + * also known as "soft load". + */ + LOAD_UNPROCESSED_PARTITION, + /** + * Loading a partition (prioritizing *unprocessed* over *processed*) from + * disk to memory. Loading a *processed* partition to memory is a prefetch + * of that partition to be processed in the next superstep. This action is + * also known as "hard load". + */ + LOAD_PARTITION, + /** + * Loading a partition regardless of the memory situation. An out-of-core + * mechanism may use this action to signal IO threads that it is allowed to + * load a partition that is specifically requested. + */ + URGENT_LOAD_PARTITION + } + + /** + * Get the next set of viable IO actions to help bring memory to a more + * desired state. + * + * @return an array of viable IO actions, sorted from highest priority to + * lowest priority + */ + IOAction[] getNextIOActions(); + + /** + * Whether a command is appropriate to bring the memory to a more desired + * state. A command is not executed unless it is approved by the oracle. This + * method is specially important where there are multiple IO threads + * performing IO operations for the out-of-core mechanism. The approval + * becomes significantly important to prevent all IO threads from performing + * identical command type, if that is a necessity. For instance, execution of + * a particular command type by only one thread may bring the memory to a + * desired state, and the rest of IO threads may perform other types of + * commands. + * + * @param command the IO command that is about to execute + * @return 'true' if the command is approved for execution. 'false' if the + * command should not be executed + */ + boolean approve(IOCommand command); + + /** + * Notification of command completion. Oracle may update its status and commit + * the changes a command may cause. + * + * @param command the IO command that is completed + */ + void commandCompleted(IOCommand command); + + /** + * Notification of GC completion. Oracle may take certain decisions based on + * GC information (such as amount of time it took, memory it reclaimed, etc.) + * + * @param gcInfo GC information + */ + void gcCompleted(GarbageCollectionNotificationInfo gcInfo); + + /** + * Shut down the out-of-core oracle. Necessary specifically for cases where + * out-of-core oracle is using additional monitoring threads. + */ + void shutdown(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java new file mode 100644 index 0000000..477b3ec --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.policy; + +import com.google.common.collect.Maps; +import com.sun.management.GarbageCollectionNotificationInfo; +import org.apache.giraph.conf.FloatConfOption; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.ooc.OutOfCoreIOStatistics; +import org.apache.giraph.ooc.command.IOCommand; +import org.apache.giraph.ooc.command.LoadPartitionIOCommand; +import org.apache.giraph.ooc.command.WaitIOCommand; +import org.apache.log4j.Logger; + +import java.lang.management.MemoryUsage; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Out-of-core oracle to adaptively control data kept in memory, with the goal + * of keeping the memory state constantly at a desired state. This oracle + * monitors GC behavior to keep track of memory pressure. + * + * After each GC is done, this oracle retrieve statistics about the memory + * pressure (memory used, max memory, and how far away memory is compared to a + * max optimal pressure). Based on the the past 2 recent memory statistics, + * the oracle predicts the status of the memory, and sets the rate of load/store + * of data from/to disk. If the rate of loading data from disk is 'l', and the + * rate of storing data to disk is 's', the rate of data injection to memory + * from disk can be denoted as 'l-s'. This oracle determines what 'l-s' should + * be based on the prediction of memory status. + * + * Assume that based on the previous GC call the memory usage at time t_0 is + * m_0, and based on the most recent GC call the memory usage at time t_1 is + * m_1. So, the rate of memory increase is alpha = (m_1 - m_0) / (t_1 - t_0). + * Assume that the ideal memory pressure happens when the memory usage is + * m_ideal. So, at time 't_2 = t_1 + (t_1 - t_0)', we want m_ideal. That means + * the ideal rate would be beta = (m_ideal - m_1) / (t_2 - t_1). If the date + * injection rate to memory so far was i, the new injection rate should be: + * i_new = i - (alpha - beta) + */ +public class SimpleGCMonitoringOracle implements OutOfCoreOracle { + /** + * The optimal memory pressure at which GC behavior is close to ideal. This + * fraction may be dependant on the GC strategy used for running a job, but + * generally should not be dependent on the graph processing application. + */ + public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE = + new FloatConfOption("giraph.optimalMemoryPressure", 0.8f, + "The memory pressure (fraction of used memory) at which the job " + + "shows the optimal GC behavior. This fraction may be dependent " + + "on the GC strategy used in running the job."); + + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(SimpleGCMonitoringOracle.class); + /** Cached value for OPTIMAL_MEMORY_PRESSURE */ + private final float optimalMemoryPressure; + /** Out-of-core engine */ + private final OutOfCoreEngine oocEngine; + /** Status of memory from the last GC call */ + private GCObservation lastGCObservation; + /** Desired rate of data injection to memory */ + private final AtomicLong desiredDiskToMemoryDataRate = + new AtomicLong(0); + /** Number of on the fly (outstanding) IO commands for each command type */ + private final Map<IOCommand.IOCommandType, AtomicInteger> commandOccurrences = + Maps.newConcurrentMap(); + + /** + * Constructor + * + * @param conf configuration + * @param oocEngine out-of-core engine + */ + public SimpleGCMonitoringOracle(ImmutableClassesGiraphConfiguration conf, + OutOfCoreEngine oocEngine) { + this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf); + this.oocEngine = oocEngine; + this.lastGCObservation = new GCObservation(-1, 0, 0); + for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) { + commandOccurrences.put(type, new AtomicInteger(0)); + } + } + + @Override + public synchronized void gcCompleted(GarbageCollectionNotificationInfo + gcInfo) { + long time = System.currentTimeMillis(); + Map<String, MemoryUsage> memAfter = gcInfo.getGcInfo() + .getMemoryUsageAfterGc(); + long usedMemory = 0; + long maxMemory = 0; + for (MemoryUsage memDetail : memAfter.values()) { + usedMemory += memDetail.getUsed(); + maxMemory += memDetail.getMax(); + } + GCObservation observation = new GCObservation(time, usedMemory, maxMemory); + if (LOG.isInfoEnabled()) { + LOG.info("gcCompleted: GC completed with: " + observation); + } + // Whether this is not the first GC call in the application + if (lastGCObservation.isValid()) { + long deltaDataRate = + lastGCObservation.getDesiredDeltaDataRate(observation); + long diskBandwidthEstimate = + oocEngine.getIOStatistics().getDiskBandwidth(); + // Update the desired data injection rate to memory. The data injection + // rate cannot be less than -disk_bandwidth (the extreme case happens if + // we only do 'store'), and cannot be more than disk_bandwidth (the + // extreme case happens if we only do 'load'). + long dataInjectionRate = desiredDiskToMemoryDataRate.get(); + desiredDiskToMemoryDataRate.set(Math.max( + Math.min(desiredDiskToMemoryDataRate.get() - deltaDataRate, + diskBandwidthEstimate), -diskBandwidthEstimate)); + if (LOG.isInfoEnabled()) { + LOG.info("gcCompleted: changing data injection rate from " + + String.format("%.2f", dataInjectionRate / 1024.0 / 1024.0) + + " to " + String.format("%.2f", desiredDiskToMemoryDataRate.get() / + 1024.0 / 1024.0)); + } + } + lastGCObservation = observation; + } + + /** + * Get the current data injection rate to memory based on the commands ran + * in the history (retrieved from statistics collector), and outstanding + * commands issued by the IO scheduler. + * + * @return the current data injection rate to memory + */ + private long getCurrentDataInjectionRate() { + long effectiveBytesTransferred = 0; + long effectiveDuration = 0; + for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) { + OutOfCoreIOStatistics.BytesDuration stats = + oocEngine.getIOStatistics().getCommandTypeStats(type); + int occurrence = commandOccurrences.get(type).get(); + long typeBytesTransferred = stats.getBytes(); + long typeDuration = stats.getDuration(); + // If there is an outstanding command, we still do not know how many bytes + // it will transfer, and how long it will take. So, we guesstimate these + // numbers based on other similar commands happened in the history. We + // simply take the average number of bytes transferred for the particular + // command, and we take average duration for the particular command. We + // should multiply these numbers by the number of outstanding commands of + // this particular command type. + if (stats.getOccurrence() != 0) { + typeBytesTransferred += stats.getBytes() / stats.getOccurrence() * + occurrence; + typeDuration += stats.getDuration() / stats.getOccurrence() * + occurrence; + } + if (type == IOCommand.IOCommandType.LOAD_PARTITION) { + effectiveBytesTransferred += typeBytesTransferred; + } else { + // Store (data going out of memory), or wait (no data transferred) + effectiveBytesTransferred -= typeBytesTransferred; + } + effectiveDuration += typeDuration; + } + if (effectiveDuration == 0) { + return 0; + } else { + return effectiveBytesTransferred / effectiveDuration; + } + } + + @Override + public IOAction[] getNextIOActions() { + long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05); + long desiredRate = desiredDiskToMemoryDataRate.get(); + long currentRate = getCurrentDataInjectionRate(); + if (desiredRate > error) { + // 'l-s' is positive, we should do more load than store. + if (currentRate > desiredRate + error) { + // We should decrease 'l-s'. This can be done either by increasing 's' + // or issuing wait command. We prioritize wait over hard store. + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PROCESSED_PARTITION}; + } else if (currentRate < desiredRate - error) { + // We should increase 'l-s'. We can simply load partitions/data. + return new IOAction[]{IOAction.LOAD_PARTITION}; + } else { + // We are in a proper state and we should keep up with the rate. We can + // either soft store data or load data (hard load, since we desired rate + // is positive). + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PROCESSED_PARTITION, + IOAction.LOAD_PARTITION}; + } + } else if (desiredRate < -error) { + // 'l-s' is negative, we should do more store than load. + if (currentRate < desiredRate - error) { + // We should increase 'l-s', but we should be cautious. We only do soft + // load, or wait. + return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION}; + } else if (currentRate > desiredRate + error) { + // We should reduce 'l-s', we do hard store. + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PARTITION}; + } else { + // We should keep up with the rate. We can either soft store data, or + // soft load data. + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PROCESSED_PARTITION, + IOAction.LOAD_UNPROCESSED_PARTITION}; + } + } else { + // 'l-s' is almost zero. If current rate is over the desired rate, we do + // soft store. If the current rate is below the desired rate, we do soft + // load. + if (currentRate > desiredRate + error) { + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PROCESSED_PARTITION}; + } else if (currentRate < desiredRate - error) { + return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION}; + } else { + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PROCESSED_PARTITION, + IOAction.LOAD_UNPROCESSED_PARTITION}; + } + } + } + + @Override + public synchronized boolean approve(IOCommand command) { + long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05); + long desiredRate = desiredDiskToMemoryDataRate.get(); + long currentRate = getCurrentDataInjectionRate(); + // The command is denied iff the current rate is above the desired rate and + // we are doing load (instead of store), or the current rate is below the + // desired rate and we are doing store (instead of loading). + if (currentRate > desiredRate + error && + command instanceof LoadPartitionIOCommand) { + return false; + } + if (currentRate < desiredRate - error && + !(command instanceof LoadPartitionIOCommand) && + !(command instanceof WaitIOCommand)) { + return false; + } + commandOccurrences.get(command.getType()).getAndIncrement(); + return true; + } + + @Override + public void commandCompleted(IOCommand command) { + commandOccurrences.get(command.getType()).getAndDecrement(); + } + + @Override + public void shutdown() { } + + /** Helper class to record memory status after GC calls */ + private class GCObservation { + /** The time at which the GC happened (in milliseconds) */ + private long time; + /** Amount of memory used after the GC call */ + private long usedMemory; + /** Maximum amounts of memory reported by GC listener */ + private long maxMemory; + + /** + * Constructor + * + * @param time time of GC + * @param usedMemory amount of used memory after GC + * @param maxMemory amount of all available memory based on GC observation + */ + public GCObservation(long time, long usedMemory, long maxMemory) { + this.time = time; + this.usedMemory = usedMemory; + this.maxMemory = maxMemory; + } + + /** + * Is this a valid observation? + * + * @return true iff it is a valid observation + */ + public boolean isValid() { + return time > 0; + } + + /** + * Considering a new observation of memory status after the most recent GC, + * what is the desired rate for data injection to memory. + * + * @param newObservation the most recent GC observation + * @return desired rate of data injection to memory + */ + public long getDesiredDeltaDataRate(GCObservation newObservation) { + long newUsedMemory = newObservation.usedMemory; + long newMaxMemory = newObservation.maxMemory; + long lastUsedMemory = usedMemory; + long lastMaxMemory = maxMemory; + // Scale the memory status of two GC observation to be the same + long scaledMaxMemory = Math.min(lastMaxMemory, newMaxMemory); + newUsedMemory = + (long) (((double) scaledMaxMemory / newMaxMemory) * newUsedMemory); + lastUsedMemory = + (long) (((double) scaledMaxMemory / lastMaxMemory) * lastUsedMemory); + long desiredUsedMemory = (long) (optimalMemoryPressure * scaledMaxMemory); + if (LOG.isInfoEnabled()) { + LOG.info("getDesiredDeltaDataRate: " + String.format("previous usage " + + "= %.2f MB, ", lastUsedMemory / 1024.0 / 1024.0) + String.format( + "current usage = %.2f MB, ", newUsedMemory / 1024.0 / 1024.0) + + String.format("ideal usage = %.2f MB", desiredUsedMemory / 1024.0 / + 1024.0)); + } + long interval = newObservation.time - time; + if (interval == 0) { + interval = 1; + LOG.warn("getDesiredDeltaRate: two GC happened almost at the same " + + "time!"); + } + long currentDataRate = (long) ((double) (newUsedMemory - + lastUsedMemory) / interval * 1000); + long desiredDataRate = (long) ((double) (desiredUsedMemory - + newUsedMemory) / interval * 1000); + return currentDataRate - desiredDataRate; + } + + @Override + public String toString() { + return String.format("(usedMemory: %.2f MB, maxMemory: %.2f MB at " + + "time: %d ms)", usedMemory / 1024.0 / 1024.0, + maxMemory / 1024.0 / 1024.0, time); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java new file mode 100644 index 0000000..ff2b3f7 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.policy; + +import com.sun.management.GarbageCollectionNotificationInfo; +import org.apache.giraph.comm.flow_control.CreditBasedFlowControl; +import org.apache.giraph.comm.flow_control.FlowControl; +import org.apache.giraph.comm.netty.NettyClient; +import org.apache.giraph.conf.FloatConfOption; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.LongConfOption; +import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.ooc.command.IOCommand; +import org.apache.giraph.utils.CallableFactory; +import org.apache.giraph.utils.LogStacktraceCallable; +import org.apache.giraph.utils.MemoryUtils; +import org.apache.giraph.utils.ThreadUtils; +import org.apache.log4j.Logger; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkState; + +/** + * Out-of-core oracle to adaptively control data kept in memory, with the goal + * of keeping the memory usage at a desired state. Out-of-core policy in this + * oracle is based on several user-defined thresholds. Also, this oracle spawns + * a thread to periodically check the memory usage. This thread would issue + * manual GC calls if JVM fails to call major/full GC for a while and the amount + * of used memory is about to cause high-memory pressure. This oracle, also, + * monitors GC activities. The monitoring mechanism looks for major/full GC + * calls, and updates out-of-core decisions based on the amount of available + * memory after such GCs. There are three out-of-core decisions: + * - Which IO operations should be done (load/offload of partitions and + * messages) + * - What the incoming messages rate should be (updating credits announced by + * this worker in credit-based flow-control mechanism) + * - How many processing threads should remain active (tethering rate of + * data generation) + * + * The following table shows the relationship of these decisions and + * used-defined thresholds. + * -------------------------------------------------------------- + * Memory Pressure | Manual | IO | Credit | Active | + * (memory usage) | GC? | Action | | Threads | + * -------------------------------------------------------------- + * | Yes | hard | 0 | 0 | + * | | store | | | + * failPressure ------------------------------------------------- + * | Yes | hard | 0 | fraction | + * | | store | | | + * emergencyPressure -------------------------------------------- + * | Yes | hard | fraction | max | + * | | store | | | + * highPressure ------------------------------------------------- + * | No | soft | fraction | max | + * | | store | | | + * optimalPressure ---------------------------------------------- + * | No | soft | max | max | + * | | load | | | + * lowPressure -------------------------------------------------- + * | No | hard | max | max | + * | | load | | | + * -------------------------------------------------------------- + * + */ +public class ThresholdBasedOracle implements OutOfCoreOracle { + /** The memory pressure at/above which the job would fail */ + public static final FloatConfOption FAIL_MEMORY_PRESSURE = + new FloatConfOption("giraph.memory.failPressure", 0.975f, + "The memory pressure (fraction of used memory) at/above which the " + + "job would fail."); + /** + * The memory pressure at which the job is cloe to fail, even though we were + * using maximal disk bandwidth and minimal network rate. We should reduce + * job processing rate. + */ + public static final FloatConfOption EMERGENCY_MEMORY_PRESSURE = + new FloatConfOption("giraph.memory.emergencyPressure", 0.925f, + "The memory pressure (fraction of used memory) at which the job " + + "is close to fail, hence we should reduce its processing rate " + + "as much as possible."); + /** The memory pressure at which the job is suffering from GC overhead. */ + public static final FloatConfOption HIGH_MEMORY_PRESSURE = + new FloatConfOption("giraph.memory.highPressure", 0.875f, + "The memory pressure (fraction of used memory) at which the job " + + "is suffering from GC overhead."); + /** + * The memory pressure at which we expect GC to perform optimally for a + * memory intensive job. + */ + public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE = + new FloatConfOption("giraph.memory.optimalPressure", 0.8f, + "The memory pressure (fraction of used memory) at which a " + + "memory-intensive job shows the optimal GC behavior."); + /** + * The memory pressure at/below which the job can use more memory without + * suffering from GC overhead. + */ + public static final FloatConfOption LOW_MEMORY_PRESSURE = + new FloatConfOption("giraph.memory.lowPressure", 0.7f, + "The memory pressure (fraction of used memory) at/below which the " + + "job can use more memory without suffering the performance."); + /** The interval at which memory observer thread wakes up. */ + public static final LongConfOption CHECK_MEMORY_INTERVAL = + new LongConfOption("giraph.checkMemoryInterval", 2500, + "The interval/period where memory observer thread wakes up and " + + "monitors memory footprint (in milliseconds)"); + /** + * Memory observer thread would manually call GC if major/full GC has not + * been called for a while. The period where we expect GC to be happened in + * past is specified in this parameter + */ + public static final LongConfOption LAST_GC_CALL_INTERVAL = + new LongConfOption("giraph.lastGcCallInterval", 10 * 1000, + "How long after last major/full GC should we call manual GC?"); + + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(ThresholdBasedOracle.class); + /** Cached value for FAIL_MEMORY_PRESSURE */ + private final float failMemoryPressure; + /** Cached value for EMERGENCY_MEMORY_PRESSURE */ + private final float emergencyMemoryPressure; + /** Cached value for HIGH_MEMORY_PRESSURE */ + private final float highMemoryPressure; + /** Cached value for OPTIMAL_MEMORY_PRESSURE */ + private final float optimalMemoryPressure; + /** Cached value for LOW_MEMORY_PRESSURE */ + private final float lowMemoryPressure; + /** Cached value for CHECK_MEMORY_INTERVAL */ + private final long checkMemoryInterval; + /** Cached value for LAST_GC_CALL_INTERVAL */ + private final long lastGCCallInterval; + /** + * Cached value for NettyClient.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER (max + * credit used for credit-based flow-control mechanism) + */ + private final short maxRequestsCredit; + /** + * Whether the job is shutting down. Used for terminating the memory + * observer thread. + */ + private final CountDownLatch shouldTerminate; + /** Result of memory observer thread */ + private final Future<Void> checkMemoryThreadResult; + /** Out-of-core engine */ + private final OutOfCoreEngine oocEngine; + /** Last time a major/full GC has been called (in milliseconds) */ + private volatile long lastMajorGCTime; + /** Last time a non major/full GC has been called (in milliseconds) */ + private volatile long lastMinorGCTime; + + /** + * Constructor + * + * @param conf configuration + * @param oocEngine out-of-core engine + */ + public ThresholdBasedOracle(ImmutableClassesGiraphConfiguration conf, + OutOfCoreEngine oocEngine) { + this.failMemoryPressure = FAIL_MEMORY_PRESSURE.get(conf); + this.emergencyMemoryPressure = EMERGENCY_MEMORY_PRESSURE.get(conf); + this.highMemoryPressure = HIGH_MEMORY_PRESSURE.get(conf); + this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf); + this.lowMemoryPressure = LOW_MEMORY_PRESSURE.get(conf); + this.checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf); + this.lastGCCallInterval = LAST_GC_CALL_INTERVAL.get(conf); + this.maxRequestsCredit = (short) + CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf); + NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.setIfUnset(conf, true); + boolean useCredit = NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf); + checkState(useCredit, "ThresholdBasedOracle: credit-based flow control " + + "must be enabled. Use giraph.waitForPerWorkerRequests=true"); + this.shouldTerminate = new CountDownLatch(1); + this.oocEngine = oocEngine; + this.lastMajorGCTime = 0; + + CallableFactory<Void> callableFactory = new CallableFactory<Void>() { + @Override + public Callable<Void> newCallable(int callableId) { + return new Callable<Void>() { + @Override + public Void call() throws Exception { + while (true) { + boolean done = shouldTerminate.await(checkMemoryInterval, + TimeUnit.MILLISECONDS); + if (done) { + break; + } + double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction(); + long time = System.currentTimeMillis(); + if ((usedMemoryFraction > highMemoryPressure && + time - lastMajorGCTime >= lastGCCallInterval) || + (usedMemoryFraction > optimalMemoryPressure && + time - lastMajorGCTime >= lastGCCallInterval && + time - lastMinorGCTime >= lastGCCallInterval)) { + if (LOG.isInfoEnabled()) { + LOG.info("call: last GC happened a while ago and the " + + "amount of used memory is high (used memory " + + "fraction is " + + String.format("%.2f", usedMemoryFraction) + "). " + + "Calling GC manually"); + } + System.gc(); + time = System.currentTimeMillis() - time; + usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction(); + if (LOG.isInfoEnabled()) { + LOG.info("call: manual GC is done. It took " + + String.format("%.2f", (double) time / 1000) + + " seconds. Used memory fraction is " + + String.format("%.2f", usedMemoryFraction)); + } + } + updateRates(usedMemoryFraction); + } + return null; + } + }; + } + }; + ExecutorService executor = Executors.newSingleThreadExecutor( + ThreadUtils.createThreadFactory("check-memory")); + this.checkMemoryThreadResult = executor.submit(new LogStacktraceCallable<>( + callableFactory.newCallable(0))); + executor.shutdown(); + } + + /** + * upon major/full GC calls. + */ + /** + * Update statistics and rate regarding communication credits and number of + * active threads. + * + * @param usedMemoryFraction the fraction of used memory over max memory + */ + public void updateRates(double usedMemoryFraction) { + // Update the fraction of processing threads that should remain active + if (usedMemoryFraction >= failMemoryPressure) { + oocEngine.updateActiveThreadsFraction(0); + } else if (usedMemoryFraction < emergencyMemoryPressure) { + oocEngine.updateActiveThreadsFraction(1); + } else { + oocEngine.updateActiveThreadsFraction(1 - + (usedMemoryFraction - emergencyMemoryPressure) / + (failMemoryPressure - emergencyMemoryPressure)); + } + + // Update the fraction of credit that should be used in credit-based flow- + // control + if (usedMemoryFraction >= emergencyMemoryPressure) { + updateRequestsCredit((short) 0); + } else if (usedMemoryFraction < optimalMemoryPressure) { + updateRequestsCredit(maxRequestsCredit); + } else { + updateRequestsCredit((short) (maxRequestsCredit * + (1 - (usedMemoryFraction - optimalMemoryPressure) / + (emergencyMemoryPressure - optimalMemoryPressure)))); + } + } + + @Override + public IOAction[] getNextIOActions() { + double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction(); + if (LOG.isInfoEnabled()) { + LOG.info(String.format("getNextIOActions: usedMemoryFraction = %.2f", + usedMemoryFraction)); + } + if (usedMemoryFraction > highMemoryPressure) { + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PARTITION}; + } else if (usedMemoryFraction > optimalMemoryPressure) { + return new IOAction[]{ + IOAction.LOAD_UNPROCESSED_PARTITION, + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PROCESSED_PARTITION}; + } else if (usedMemoryFraction > lowMemoryPressure) { + return new IOAction[]{ + IOAction.LOAD_UNPROCESSED_PARTITION, + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.LOAD_PARTITION}; + } else { + return new IOAction[]{IOAction.LOAD_PARTITION}; + } + } + + @Override + public boolean approve(IOCommand command) { + return true; + } + + @Override + public void commandCompleted(IOCommand command) { + // Do nothing + } + + @Override + public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { + String gcAction = gcInfo.getGcAction().toLowerCase(); + if (gcAction.contains("full") || gcAction.contains("major")) { + if (!gcInfo.getGcCause().contains("No GC")) { + lastMajorGCTime = System.currentTimeMillis(); + } + } else { + lastMinorGCTime = System.currentTimeMillis(); + } + } + + @Override + public void shutdown() { + shouldTerminate.countDown(); + try { + checkMemoryThreadResult.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("shutdown: caught exception while waiting on check-memory " + + "thread to terminate!"); + throw new IllegalStateException(e); + } + if (LOG.isInfoEnabled()) { + LOG.info("shutdown: ThresholdBasedOracle shutdown complete!"); + } + } + + /** + * Update the credit announced for this worker in Netty. The lower the credit + * is, the lower rate incoming messages arrive at this worker. Thus, credit + * is an indirect way of controlling amount of memory incoming messages would + * take. + * + * @param newCredit the new credit to announce to other workers + */ + private void updateRequestsCredit(short newCredit) { + if (LOG.isInfoEnabled()) { + LOG.info("updateRequestsCredit: updating the credit to " + newCredit); + } + FlowControl flowControl = oocEngine.getFlowControl(); + if (flowControl != null) { + ((CreditBasedFlowControl) flowControl).updateCredit(newCredit); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/package-info.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/package-info.java new file mode 100644 index 0000000..c58289f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package of classes related to out-of-core policy + */ +package org.apache.giraph.ooc.policy; http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java index d3ace99..c54e7b2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java @@ -123,7 +123,7 @@ public class ZooKeeperManager { this.context = context; this.conf = configuration; taskPartition = conf.getTaskPartition(); - jobId = conf.get("mapred.job.id", "Unknown Job"); + jobId = conf.getJobId(); baseDirectory = new Path(ZOOKEEPER_MANAGER_DIRECTORY.getWithDefault(conf, getFinalZooKeeperPath()));
