Repository: giraph Updated Branches: refs/heads/trunk 77a24a9fc -> 23184e150
GIRAPH-1081: Fix a bug in internal out-of-core infra: multithreaded accesses to buffers Summary: The multi-threaded accesses to raw data buffers in `DiskBackedDataStore` is overlooked, violating assumption on properly partitioning data to different IO threads. Test Plan: mvn clean verify Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo Reviewed By: maja.kabiljo Differential Revision: https://reviews.facebook.net/D60147 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/23184e15 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/23184e15 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/23184e15 Branch: refs/heads/trunk Commit: 23184e15023157312d7a8afe0adfe30b5d7864a8 Parents: 77a24a9 Author: Hassan Eslami <[email protected]> Authored: Tue Jun 28 18:43:18 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Tue Jun 28 18:43:33 2016 -0700 ---------------------------------------------------------------------- .../org/apache/giraph/ooc/data/DiskBackedDataStore.java | 11 ++++++++--- .../org/apache/giraph/ooc/data/MetaPartitionManager.java | 6 +++--- 2 files changed, 11 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/23184e15/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java index 7265410..e9ab167 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java @@ -354,16 +354,21 @@ public abstract class DiskBackedDataStore<T> { * offloaded to disk), and sees if any of them has enough raw data buffer in * memory. If so, puts that partition in a list to return. * + * @param ioThreadId Id of the IO thread who would offload the buffers * @return Set of partition ids of all partition raw buffers where the * aggregate size of buffers are large enough and it is worth flushing * those buffers to disk */ - public Set<Integer> getCandidateBuffersToOffload() { + public Set<Integer> getCandidateBuffersToOffload(int ioThreadId) { Set<Integer> result = new HashSet<>(); for (Map.Entry<Integer, Pair<Integer, List<T>>> entry : dataBuffers.entrySet()) { - if (entry.getValue().getLeft() > minBufferSizeToOffload) { - result.add(entry.getKey()); + int partitionId = entry.getKey(); + long aggregateBufferSize = entry.getValue().getLeft(); + if (aggregateBufferSize > minBufferSizeToOffload && + oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId) == + ioThreadId) { + result.add(partitionId); } } return result; http://git-wip-us.apache.org/repos/asf/giraph/blob/23184e15/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java index 64e3aed..3075829 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java @@ -332,12 +332,12 @@ public class MetaPartitionManager { (DiskBackedPartitionStore<?, ?, ?>) (oocEngine.getServerData() .getPartitionStore()); perThreadVertexEdgeBuffers.get(threadId) - .addAll(partitionStore.getCandidateBuffersToOffload()); + .addAll(partitionStore.getCandidateBuffersToOffload(threadId)); DiskBackedEdgeStore<?, ?, ?> edgeStore = (DiskBackedEdgeStore<?, ?, ?>) (oocEngine.getServerData()) .getEdgeStore(); perThreadVertexEdgeBuffers.get(threadId) - .addAll(edgeStore.getCandidateBuffersToOffload()); + .addAll(edgeStore.getCandidateBuffersToOffload(threadId)); partitionId = popFromSet(perThreadVertexEdgeBuffers.get(threadId)); } return partitionId; @@ -361,7 +361,7 @@ public class MetaPartitionManager { .getIncomingMessageStore()); if (messageStore != null) { perThreadMessageBuffers.get(threadId) - .addAll(messageStore.getCandidateBuffersToOffload()); + .addAll(messageStore.getCandidateBuffersToOffload(threadId)); partitionId = popFromSet(perThreadMessageBuffers.get(threadId)); } }
