Repository: giraph Updated Branches: refs/heads/trunk 5b0cd0e0a -> 843d1863d
GIRAPH-1035: Make sure we are able to use all compute threads Summary: The default logic of choosing the number of partitions when we use few workers and a lot of compute threads ends up choosing less partitions than there are threads. Add additional setting to prevent that. Test Plan: Run a job with a few workers and lot of threads and verified number of partitions is set properly. mvn verify passed. Differential Revision: https://reviews.facebook.net/D48993 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/843d1863 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/843d1863 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/843d1863 Branch: refs/heads/trunk Commit: 843d1863d3624eeda4f094beefdd1eaf6eb23daf Parents: 5b0cd0e Author: Maja Kabiljo <[email protected]> Authored: Mon Oct 19 10:35:54 2015 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Mon Oct 19 10:58:36 2015 -0700 ---------------------------------------------------------------------- .../org/apache/giraph/conf/GiraphConstants.java | 5 +++ .../PseudoRandomIntNullLocalEdgesHelper.java | 4 +- .../formats/PseudoRandomLocalEdgesHelper.java | 4 +- .../giraph/partition/HashMasterPartitioner.java | 2 +- .../apache/giraph/partition/PartitionUtils.java | 46 ++++++++++++-------- .../partition/SimpleMasterPartitioner.java | 2 +- 6 files changed, 39 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 2804192..5a0328b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -862,6 +862,11 @@ public interface GiraphConstants { new FloatConfOption("giraph.masterPartitionCountMultiplier", 1.0f, "Multiplier for the current workers squared"); + /** Minimum number of partitions to have per compute thread */ + IntConfOption MIN_PARTITIONS_PER_COMPUTE_THREAD = + new IntConfOption("giraph.minPartitionsPerComputeThread", 1, + "Minimum number of partitions to have per compute thread"); + /** Overrides default partition count calculation if not -1 */ IntConfOption USER_PARTITION_COUNT = new IntConfOption("giraph.userPartitionCount", -1, http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java index 46997a8..ab5bfb3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullLocalEdgesHelper.java @@ -56,8 +56,8 @@ public class PseudoRandomIntNullLocalEdgesHelper { int numWorkers = conf.getMaxWorkers(); List<WorkerInfo> workerInfos = Collections.nCopies(numWorkers, new WorkerInfo()); - numPartitions = PartitionUtils.computePartitionCount(workerInfos, - numWorkers, conf); + numPartitions = PartitionUtils.computePartitionCount( + workerInfos.size(), conf); partitionSize = numVertices / numPartitions; } http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java index 84502e1..1b421b7 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java @@ -61,8 +61,8 @@ public class PseudoRandomLocalEdgesHelper { int numWorkers = conf.getMaxWorkers(); List<WorkerInfo> workerInfos = Collections.nCopies(numWorkers, new WorkerInfo()); - numPartitions = PartitionUtils.computePartitionCount(workerInfos, - numWorkers, conf); + numPartitions = + PartitionUtils.computePartitionCount(workerInfos.size(), conf); partitionSize = numVertices / numPartitions; } http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java index caede8c..607347d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java @@ -61,7 +61,7 @@ public class HashMasterPartitioner<I extends WritableComparable, public Collection<PartitionOwner> createInitialPartitionOwners( Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) { int partitionCount = PartitionUtils.computePartitionCount( - availableWorkerInfos, maxWorkers, conf); + availableWorkerInfos.size(), conf); List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>(); Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator(); for (int i = 0; i < partitionCount; ++i) { http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java index 6914c3b..e4305ff 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java @@ -36,6 +36,9 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import static org.apache.giraph.conf.GiraphConstants + .MIN_PARTITIONS_PER_COMPUTE_THREAD; +import static org.apache.giraph.conf.GiraphConstants.NUM_COMPUTE_THREADS; import static org.apache.giraph.conf.GiraphConstants.USER_PARTITION_COUNT; /** @@ -171,15 +174,18 @@ public class PartitionUtils { /** * Compute the number of partitions, based on the configuration. * - * @param availableWorkerInfos Available workers. - * @param maxWorkers Maximum number of workers. + * If USER_PARTITION_COUNT is set, it will follow that, otherwise it will + * choose the max of what MIN_PARTITIONS_PER_COMPUTE_THREAD and + * PARTITION_COUNT_MULTIPLIER settings would choose, capped by max + * partitions limited constrained by zookeeper. + * + * @param availableWorkerCount Number of available workers * @param conf Configuration. * @return Number of partitions for the job. */ - public static int computePartitionCount( - Collection<WorkerInfo> availableWorkerInfos, int maxWorkers, + public static int computePartitionCount(int availableWorkerCount, ImmutableClassesGiraphConfiguration conf) { - if (availableWorkerInfos.isEmpty()) { + if (availableWorkerCount == 0) { throw new IllegalArgumentException( "computePartitionCount: No available workers"); } @@ -188,32 +194,36 @@ public class PartitionUtils { int partitionCount; if (userPartitionCount == USER_PARTITION_COUNT.getDefaultValue()) { float multiplier = GiraphConstants.PARTITION_COUNT_MULTIPLIER.get(conf); - partitionCount = - Math.max((int) (multiplier * availableWorkerInfos.size() * - availableWorkerInfos.size()), - 1); + partitionCount = Math.max( + (int) (multiplier * availableWorkerCount * availableWorkerCount), 1); + int minPartitionsPerComputeThread = + MIN_PARTITIONS_PER_COMPUTE_THREAD.get(conf); + int totalComputeThreads = + NUM_COMPUTE_THREADS.get(conf) * availableWorkerCount; + partitionCount = Math.max(partitionCount, + minPartitionsPerComputeThread * totalComputeThreads); } else { partitionCount = userPartitionCount; } if (LOG.isInfoEnabled()) { LOG.info("computePartitionCount: Creating " + - partitionCount + ", default would have been " + - (availableWorkerInfos.size() * - availableWorkerInfos.size()) + " partitions."); + partitionCount + " partitions."); } int maxPartitions = getMaxPartitions(conf); if (partitionCount > maxPartitions) { // try to keep partitionCount divisible by number of workers // in order to keep the balance - int reducedPartitions = (maxPartitions / availableWorkerInfos.size()) * - availableWorkerInfos.size(); + int reducedPartitions = (maxPartitions / availableWorkerCount) * + availableWorkerCount; if (reducedPartitions == 0) { reducedPartitions = maxPartitions; } - LOG.warn("computePartitionCount: " + - "Reducing the partitionCount to " + reducedPartitions + - " from " + partitionCount + " because of " + maxPartitions + - " limit"); + if (LOG.isInfoEnabled()) { + LOG.info("computePartitionCount: " + + "Reducing the partitionCount to " + reducedPartitions + + " from " + partitionCount + " because of " + maxPartitions + + " limit"); + } partitionCount = reducedPartitions; } http://git-wip-us.apache.org/repos/asf/giraph/blob/843d1863/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java index 7d4c1cb..638dacf 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java @@ -61,7 +61,7 @@ public abstract class SimpleMasterPartitioner<I extends WritableComparable, public Collection<PartitionOwner> createInitialPartitionOwners( Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) { int partitionCount = PartitionUtils.computePartitionCount( - availableWorkerInfos, maxWorkers, conf); + availableWorkerInfos.size(), conf); ArrayList<WorkerInfo> workerList = new ArrayList<WorkerInfo>(availableWorkerInfos);
