Updated Branches: refs/heads/trunk 71eab655e -> 507959dcb
GIRAPH-535: Range-partitioning and edge locality benchmark (apresta) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/507959dc Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/507959dc Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/507959dc Branch: refs/heads/trunk Commit: 507959dcb4af9b5c5f26a97c237cdb7b6235e7a1 Parents: 71eab65 Author: Alessandro Presta <[email protected]> Authored: Mon Feb 25 11:57:41 2013 -0800 Committer: Alessandro Presta <[email protected]> Committed: Mon Feb 25 12:06:27 2013 -0800 ---------------------------------------------------------------------- CHANGELOG | 4 + .../giraph/benchmark/AggregatorsBenchmark.java | 5 +- .../apache/giraph/benchmark/PageRankBenchmark.java | 52 ++++++--- .../giraph/benchmark/RandomMessageBenchmark.java | 5 +- .../giraph/benchmark/ShortestPathsBenchmark.java | 5 +- .../org/apache/giraph/conf/GiraphConstants.java | 17 +++ .../io/formats/PseudoRandomEdgeInputFormat.java | 29 +++--- .../io/formats/PseudoRandomVertexInputFormat.java | 34 +++--- .../giraph/partition/HashMasterPartitioner.java | 80 ++------------ .../giraph/partition/HashWorkerPartitioner.java | 44 +------- .../apache/giraph/partition/PartitionBalancer.java | 66 +++++++++++- .../apache/giraph/partition/PartitionUtils.java | 86 +++++++++++++- .../org/apache/giraph/io/TestJsonBase64Format.java | 9 +- .../org/apache/giraph/TestGraphPartitioner.java | 39 ++++++- .../org/apache/giraph/TestPartitionContext.java | 10 +- .../org/apache/giraph/examples/TestPageRank.java | 4 +- 16 files changed, 292 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 3323438..d1a3d1e 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,8 +1,12 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-535: Range-partitioning and edge locality benchmark (apresta) + GIRAPH-539: When having open requests log which workers are they sent to (majakabiljo) + GIRAPH-530: GiraphInputFormat#getSplits() should be aware of multithreaded input (apresta) + GIRAPH-532: Give an explanation when trying to use unregistered aggregators (majakabiljo) GIRAPH-453: Pure Hive I/O (nitay) http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java index a82a9f8..4e47042 100644 --- a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java @@ -24,6 +24,7 @@ import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.giraph.aggregators.LongSumAggregator; +import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants; import org.apache.giraph.master.DefaultMasterCompute; import org.apache.giraph.worker.DefaultWorkerContext; import org.apache.giraph.vertex.EdgeListVertex; @@ -257,10 +258,10 @@ public class AggregatorsBenchmark implements Tool { AggregatorsBenchmarkWorkerContext.class); job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f); job.getConfiguration().setLong( - PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, + PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, Long.parseLong(cmd.getOptionValue('V'))); job.getConfiguration().setLong( - PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, + PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 1); job.getConfiguration().setInt(AGGREGATORS_NUM, Integer.parseInt(cmd.getOptionValue('a'))); http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java index 8341dce..06ee80c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java @@ -25,10 +25,12 @@ import org.apache.commons.cli.PosixParser; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.combiner.DoubleSumCombiner; +import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants; import org.apache.giraph.job.GiraphJob; import org.apache.giraph.io.formats.JsonBase64VertexOutputFormat; import org.apache.giraph.io.formats.PseudoRandomEdgeInputFormat; import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat; +import org.apache.giraph.partition.SimpleLongRangePartitionerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -90,6 +92,15 @@ public class PageRankBenchmark implements Tool { "EdgeInputFormat), " + "7 for MultiGraphByteArrayVertex with unsafe (using " + "EdgeInputFormat))"); + options.addOption("l", + "localEdgesMinRatio", + true, + "Minimum ratio of partition-local edges (default is 0)"); + options.addOption("p", + "partitioner", + true, + "Partitioning algorithm (0 for hash partitioning (default), " + + "1 for range partitioning)"); options.addOption("N", "name", true, @@ -140,7 +151,7 @@ public class PageRankBenchmark implements Tool { GiraphJob job = new GiraphJob(getConf(), name); GiraphConfiguration configuration = job.getConfiguration(); - setVertexAndInputFormatClasses(cmd, configuration); + setClassesAndParameters(cmd, configuration); configuration.setWorkerConfiguration(workers, workers, 100.0f); configuration.setInt( @@ -159,12 +170,13 @@ public class PageRankBenchmark implements Tool { } /** - * Set vertex class and input format class based on command-line arguments. + * Set vertex, input format, partitioner classes and related parameters + * based on command-line arguments. * * @param cmd Command line arguments * @param configuration Giraph job configuration */ - protected void setVertexAndInputFormatClasses( + protected void setClassesAndParameters( CommandLine cmd, GiraphConfiguration configuration) { int vertexClassOption = cmd.hasOption('c') ? Integer.parseInt( cmd.getOptionValue('c')) : 1; @@ -201,24 +213,24 @@ public class PageRankBenchmark implements Tool { configuration.setVertexCombinerClass( DoubleSumCombiner.class); } + if (vertexClassOption <= 3) { configuration.setVertexInputFormatClass( PseudoRandomVertexInputFormat.class); - configuration.setLong( - PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, - Long.parseLong(cmd.getOptionValue('V'))); - configuration.setLong( - PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, - Long.parseLong(cmd.getOptionValue('e'))); } else { - configuration.setEdgeInputFormatClass( - PseudoRandomEdgeInputFormat.class); - configuration.setLong( - PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES, - Long.parseLong(cmd.getOptionValue('V'))); - configuration.setLong( - PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX, - Long.parseLong(cmd.getOptionValue('e'))); + configuration.setEdgeInputFormatClass(PseudoRandomEdgeInputFormat.class); + } + configuration.setLong( + PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, + Long.parseLong(cmd.getOptionValue('V'))); + configuration.setLong( + PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, + Long.parseLong(cmd.getOptionValue('e'))); + if (cmd.hasOption('l')) { + float localEdgesMinRatio = Float.parseFloat(cmd.getOptionValue('l')); + configuration.setFloat( + PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO, + localEdgesMinRatio); } int vertexOutputClassOption = @@ -229,6 +241,12 @@ public class PageRankBenchmark implements Tool { configuration.setVertexOutputFormatClass( JsonBase64VertexOutputFormat.class); } + + if (cmd.hasOption('p') && + Integer.parseInt(cmd.getOptionValue('p')) == 1) { + configuration.setGraphPartitionerFactoryClass( + SimpleLongRangePartitionerFactory.class); + } } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java index d0d80af..c8e33dd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java @@ -25,6 +25,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.giraph.aggregators.LongSumAggregator; import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants; import org.apache.giraph.master.DefaultMasterCompute; import org.apache.giraph.vertex.EdgeListVertex; import org.apache.giraph.job.GiraphJob; @@ -362,10 +363,10 @@ public class RandomMessageBenchmark implements Tool { RandomMessageBenchmarkMasterCompute.class); job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f); job.getConfiguration().setLong( - PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, + PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, Long.parseLong(cmd.getOptionValue('V'))); job.getConfiguration().setLong( - PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, + PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, Long.parseLong(cmd.getOptionValue('e'))); job.getConfiguration().setInt( SUPERSTEP_COUNT, http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java index 52bbac4..1843da9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java @@ -25,6 +25,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.combiner.MinimumDoubleCombiner; +import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants; import org.apache.giraph.vertex.EdgeListVertex; import org.apache.giraph.job.GiraphJob; import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat; @@ -139,10 +140,10 @@ public class ShortestPathsBenchmark implements Tool { } job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f); job.getConfiguration().setLong( - PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, + PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, Long.parseLong(cmd.getOptionValue('V'))); job.getConfiguration().setLong( - PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, + PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, Long.parseLong(cmd.getOptionValue('e'))); boolean isVerbose = false; http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index e0aeba3..fcdd57b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -497,6 +497,23 @@ public interface GiraphConstants { */ boolean USE_INPUT_SPLIT_LOCALITY_DEFAULT = true; + /** Multiplier for the current workers squared */ + String PARTITION_COUNT_MULTIPLIER = + "partition.masterPartitionCountMultipler"; + /** Default mulitplier for current workers squared */ + float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f; + + /** Overrides default partition count calculation if not -1 */ + String USER_PARTITION_COUNT = + "partition.userPartitionCount"; + /** Default user partition count */ + int DEFAULT_USER_PARTITION_COUNT = -1; + + /** Vertex key space size for + * {@link org.apache.giraph.partition.SimpleRangeWorkerPartitioner} + */ + String PARTITION_VERTEX_KEY_SPACE_SIZE = "partition.vertexKeySpaceSize"; + /** Java opts passed to ZooKeeper startup */ String ZOOKEEPER_JAVA_OPTS = "giraph.zkJavaOpts"; /** Default java opts passed to ZooKeeper startup */ http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java index d197925..2024863 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java @@ -46,13 +46,6 @@ import java.util.Set; */ public class PseudoRandomEdgeInputFormat extends EdgeInputFormat<LongWritable, DoubleWritable> { - /** Set the number of aggregate vertices. */ - public static final String AGGREGATE_VERTICES = - "pseudoRandomEdgeInputFormat.aggregateVertices"; - /** Set the number of edges per vertex (pseudo-random destination). */ - public static final String EDGES_PER_VERTEX = - "pseudoRandomEdgeInputFormat.edgesPerVertex"; - @Override public final List<InputSplit> getSplits(final JobContext context, final int minSplitCountHint) @@ -103,6 +96,8 @@ public class PseudoRandomEdgeInputFormat private BspInputSplit bspInputSplit; /** Saved configuration */ private ImmutableClassesGiraphConfiguration configuration; + /** Helper for generating pseudo-random local edges. */ + private PseudoRandomLocalEdgesHelper localEdgesHelper; @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) @@ -111,10 +106,10 @@ public class PseudoRandomEdgeInputFormat context.getConfiguration()); aggregateVertices = configuration.getLong( - PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES, 0); + PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 0); if (aggregateVertices <= 0) { throw new IllegalArgumentException( - PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES + " <= 0"); + PseudoRandomInputFormatConstants.AGGREGATE_VERTICES + " <= 0"); } if (inputSplit instanceof BspInputSplit) { bspInputSplit = (BspInputSplit) inputSplit; @@ -135,11 +130,16 @@ public class PseudoRandomEdgeInputFormat " instead of " + BspInputSplit.class); } edgesPerVertex = configuration.getLong( - PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX, 0); + PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0); if (edgesPerVertex <= 0) { throw new IllegalArgumentException( - PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX + " <= 0"); + PseudoRandomInputFormatConstants.EDGES_PER_VERTEX + " <= 0"); } + float minLocalEdgesRatio = configuration.getFloat( + PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO, + PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT); + localEdgesHelper = new PseudoRandomLocalEdgesHelper(aggregateVertices, + minLocalEdgesRatio, configuration); } @Override @@ -172,11 +172,10 @@ public class PseudoRandomEdgeInputFormat @Override public Edge<LongWritable, DoubleWritable> getCurrentEdge() throws IOException, InterruptedException { - LongWritable destVertexId; + LongWritable destVertexId = new LongWritable(); do { - destVertexId = - new LongWritable(Math.abs(random.nextLong()) % - aggregateVertices); + destVertexId.set(localEdgesHelper.generateDestVertex( + currentVertexId.get(), random)); } while (currentVertexDestVertices.contains(destVertexId)); ++currentVertexEdgesRead; currentVertexDestVertices.add(destVertexId); http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java index 19bc3b8..4da8f9d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java @@ -18,6 +18,8 @@ package org.apache.giraph.io.formats; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.giraph.bsp.BspInputSplit; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.Edge; @@ -33,9 +35,6 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -52,13 +51,6 @@ import java.util.Set; */ public class PseudoRandomVertexInputFormat<M extends Writable> extends VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable, M> { - /** Set the number of aggregate vertices. */ - public static final String AGGREGATE_VERTICES = - "pseudoRandomVertexInputFormat.aggregateVertices"; - /** Set the number of edges per vertex (pseudo-random destination). */ - public static final String EDGES_PER_VERTEX = - "pseudoRandomVertexInputFormat.edgesPerVertex"; - @Override public final List<InputSplit> getSplits(final JobContext context, final int minSplitCountHint) throws IOException, InterruptedException { @@ -101,6 +93,8 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends private BspInputSplit bspInputSplit; /** Saved configuration */ private ImmutableClassesGiraphConfiguration configuration; + /** Helper for generating pseudo-random local edges. */ + private PseudoRandomLocalEdgesHelper localEdgesHelper; /** * Default constructor for reflection. @@ -115,10 +109,10 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends context.getConfiguration()); aggregateVertices = configuration.getLong( - PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 0); + PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 0); if (aggregateVertices <= 0) { throw new IllegalArgumentException( - PseudoRandomVertexInputFormat.AGGREGATE_VERTICES + " <= 0"); + PseudoRandomInputFormatConstants.AGGREGATE_VERTICES + " <= 0"); } if (inputSplit instanceof BspInputSplit) { bspInputSplit = (BspInputSplit) inputSplit; @@ -139,11 +133,16 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends " instead of " + BspInputSplit.class); } edgesPerVertex = configuration.getLong( - PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 0); + PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0); if (edgesPerVertex <= 0) { throw new IllegalArgumentException( - PseudoRandomVertexInputFormat.EDGES_PER_VERTEX + " <= 0"); + PseudoRandomInputFormatConstants.EDGES_PER_VERTEX + " <= 0"); } + float minLocalEdgesRatio = configuration.getFloat( + PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO, + PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT); + localEdgesHelper = new PseudoRandomLocalEdgesHelper(aggregateVertices, + minLocalEdgesRatio, configuration); } @Override @@ -166,11 +165,10 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends Lists.newArrayListWithCapacity((int) edgesPerVertex); Set<LongWritable> destVertices = Sets.newHashSet(); for (long i = 0; i < edgesPerVertex; ++i) { - LongWritable destVertexId = null; + LongWritable destVertexId = new LongWritable(); do { - destVertexId = - new LongWritable(Math.abs(rand.nextLong()) % - aggregateVertices); + destVertexId.set( + localEdgesHelper.generateDestVertex(vertexId, rand)); } while (destVertices.contains(destVertexId)); edges.add(EdgeFactory.create(destVertexId, new DoubleWritable(rand.nextDouble()))); http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java index a9611d9..5faf367 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java @@ -18,17 +18,17 @@ package org.apache.giraph.partition; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.log4j.Logger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + /** * Master will execute a hash based partitioning. * @@ -41,24 +41,10 @@ import org.apache.log4j.Logger; public class HashMasterPartitioner<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> implements MasterGraphPartitioner<I, V, E, M> { - /** Multiplier for the current workers squared */ - public static final String PARTITION_COUNT_MULTIPLIER = - "hash.masterPartitionCountMultipler"; - /** Default mulitplier for current workers squared */ - public static final float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f; - /** Overrides default partition count calculation if not -1 */ - public static final String USER_PARTITION_COUNT = - "hash.userPartitionCount"; - /** Default user partition count */ - public static final int DEFAULT_USER_PARTITION_COUNT = -1; /** Class logger */ private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class); /** Provided configuration */ private ImmutableClassesGiraphConfiguration conf; - /** Specified partition count (overrides calculation) */ - private final int userPartitionCount; - /** Partition count (calculated in createInitialPartitionOwners) */ - private int partitionCount = -1; /** Save the last generated partition owner list */ private List<PartitionOwner> partitionOwnerList; @@ -69,44 +55,15 @@ public class HashMasterPartitioner<I extends WritableComparable, */ public HashMasterPartitioner(ImmutableClassesGiraphConfiguration conf) { this.conf = conf; - userPartitionCount = conf.getInt(USER_PARTITION_COUNT, - DEFAULT_USER_PARTITION_COUNT); } @Override public Collection<PartitionOwner> createInitialPartitionOwners( Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) { - if (availableWorkerInfos.isEmpty()) { - throw new IllegalArgumentException( - "createInitialPartitionOwners: No available workers"); - } + int partitionCount = PartitionUtils.computePartitionCount( + availableWorkerInfos, maxWorkers, conf); List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>(); Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator(); - if (userPartitionCount == DEFAULT_USER_PARTITION_COUNT) { - float multiplier = conf.getFloat( - PARTITION_COUNT_MULTIPLIER, - DEFAULT_PARTITION_COUNT_MULTIPLIER); - partitionCount = - Math.max((int) (multiplier * availableWorkerInfos.size() * - availableWorkerInfos.size()), - 1); - } else { - partitionCount = userPartitionCount; - } - if (LOG.isInfoEnabled()) { - LOG.info("createInitialPartitionOwners: Creating " + - partitionCount + ", default would have been " + - (availableWorkerInfos.size() * - availableWorkerInfos.size()) + " partitions."); - } - int maxPartitions = getMaxPartitions(); - if (partitionCount > maxPartitions) { - LOG.warn("createInitialPartitionOwners: " + - "Reducing the partitionCount to " + maxPartitions + - " from " + partitionCount); - partitionCount = maxPartitions; - } - for (int i = 0; i < partitionCount; ++i) { PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next()); if (!workerIt.hasNext()) { @@ -151,26 +108,5 @@ public class HashMasterPartitioner<I extends WritableComparable, return new PartitionStats(); } - /** - * Get the maximum number of partitions supported by Giraph. - * - * ZooKeeper has a limit of the data in a single znode of 1 MB, - * and we write all partition descriptions to the same znode. - * - * If we are not using checkpointing, each partition owner is serialized - * as 4 ints (16B), and we need some space to write the list of workers - * there. 50k partitions is conservative enough. - * - * When checkpointing is used, we need enough space to write all the - * checkpoint file paths. - * - * @return Maximum number of partitions allowed - */ - private int getMaxPartitions() { - if (conf.useCheckpointing()) { - return 5000; - } else { - return 50000; - } - } + } http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java index bb6e38d..599ea0c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java @@ -23,13 +23,8 @@ import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; /** * Implements hash-based partitioning from the id hash code. @@ -73,43 +68,8 @@ public class HashWorkerPartitioner<I extends WritableComparable, WorkerInfo myWorkerInfo, Collection<? extends PartitionOwner> masterSetPartitionOwners, PartitionStore<I, V, E, M> partitionStore) { - partitionOwnerList.clear(); - partitionOwnerList.addAll(masterSetPartitionOwners); - - Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>(); - Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap = - new HashMap<WorkerInfo, List<Integer>>(); - for (PartitionOwner partitionOwner : masterSetPartitionOwners) { - if (partitionOwner.getPreviousWorkerInfo() == null) { - continue; - } else if (partitionOwner.getWorkerInfo().equals( - myWorkerInfo) && - partitionOwner.getPreviousWorkerInfo().equals( - myWorkerInfo)) { - throw new IllegalStateException( - "updatePartitionOwners: Impossible to have the same " + - "previous and current worker info " + partitionOwner + - " as me " + myWorkerInfo); - } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) { - dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo()); - } else if (partitionOwner.getPreviousWorkerInfo().equals( - myWorkerInfo)) { - if (workerPartitionOwnerMap.containsKey( - partitionOwner.getWorkerInfo())) { - workerPartitionOwnerMap.get( - partitionOwner.getWorkerInfo()).add( - partitionOwner.getPartitionId()); - } else { - List<Integer> tmpPartitionOwnerList = new ArrayList<Integer>(); - tmpPartitionOwnerList.add(partitionOwner.getPartitionId()); - workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(), - tmpPartitionOwnerList); - } - } - } - - return new PartitionExchange(dependentWorkerSet, - workerPartitionOwnerMap); + return PartitionBalancer.updatePartitionOwners(partitionOwnerList, + myWorkerInfo, masterSetPartitionOwners, partitionStore); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java index bdbd467..2befa9c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java @@ -18,18 +18,20 @@ package org.apache.giraph.partition; +import org.apache.giraph.worker.WorkerInfo; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.PriorityQueue; - -import org.apache.giraph.worker.WorkerInfo; -import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.Logger; +import java.util.Set; /** * Helper class for balancing partitions across a set of workers. @@ -284,5 +286,61 @@ public class PartitionBalancer { return partitionOwnerList; } + + /** + * Helper function to update partition owners and determine which + * partitions need to be sent from a specific worker. + * + * @param partitionOwnerList Local {@link PartitionOwner} list for the + * given worker + * @param myWorkerInfo Worker info + * @param masterSetPartitionOwners Master set partition owners, received + * prior to beginning the superstep + * @param partitionStore Partition store for the given worker + * @return Information for the partition exchange. + */ + public static PartitionExchange updatePartitionOwners( + List<PartitionOwner> partitionOwnerList, + WorkerInfo myWorkerInfo, + Collection<? extends PartitionOwner> masterSetPartitionOwners, + PartitionStore partitionStore) { + partitionOwnerList.clear(); + partitionOwnerList.addAll(masterSetPartitionOwners); + + Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>(); + Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap = + new HashMap<WorkerInfo, List<Integer>>(); + for (PartitionOwner partitionOwner : masterSetPartitionOwners) { + if (partitionOwner.getPreviousWorkerInfo() == null) { + continue; + } else if (partitionOwner.getWorkerInfo().equals( + myWorkerInfo) && + partitionOwner.getPreviousWorkerInfo().equals( + myWorkerInfo)) { + throw new IllegalStateException( + "updatePartitionOwners: Impossible to have the same " + + "previous and current worker info " + partitionOwner + + " as me " + myWorkerInfo); + } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) { + dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo()); + } else if (partitionOwner.getPreviousWorkerInfo().equals( + myWorkerInfo)) { + if (workerPartitionOwnerMap.containsKey( + partitionOwner.getWorkerInfo())) { + workerPartitionOwnerMap.get( + partitionOwner.getWorkerInfo()).add( + partitionOwner.getPartitionId()); + } else { + List<Integer> tmpPartitionOwnerList = new ArrayList<Integer>(); + tmpPartitionOwnerList.add(partitionOwner.getPartitionId()); + workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(), + tmpPartitionOwnerList); + } + } + } + + return new PartitionExchange(dependentWorkerSet, + workerPartitionOwnerMap); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java index e472ac6..c83ca45 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java @@ -18,6 +18,14 @@ package org.apache.giraph.partition; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.VertexEdgeCount; +import org.apache.giraph.worker.WorkerInfo; +import org.apache.log4j.Logger; + import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -26,13 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import org.apache.giraph.graph.VertexEdgeCount; -import org.apache.giraph.worker.WorkerInfo; -import org.apache.log4j.Logger; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - /** * Helper class for {@link Partition} related operations. */ @@ -148,4 +149,75 @@ public class PartitionUtils { getValue().getEdgeCount()); } } + + /** + * Compute the number of partitions, based on the configuration. + * + * @param availableWorkerInfos Available workers. + * @param maxWorkers Maximum number of workers. + * @param conf Configuration. + * @return Number of partitions for the job. + */ + public static int computePartitionCount( + Collection<WorkerInfo> availableWorkerInfos, int maxWorkers, + ImmutableClassesGiraphConfiguration conf) { + if (availableWorkerInfos.isEmpty()) { + throw new IllegalArgumentException( + "computePartitionCount: No available workers"); + } + + int userPartitionCount = conf.getInt(GiraphConstants.USER_PARTITION_COUNT, + GiraphConstants.DEFAULT_USER_PARTITION_COUNT); + int partitionCount; + if (userPartitionCount == GiraphConstants.DEFAULT_USER_PARTITION_COUNT) { + float multiplier = conf.getFloat( + GiraphConstants.PARTITION_COUNT_MULTIPLIER, + GiraphConstants.DEFAULT_PARTITION_COUNT_MULTIPLIER); + partitionCount = + Math.max((int) (multiplier * availableWorkerInfos.size() * + availableWorkerInfos.size()), + 1); + } else { + partitionCount = userPartitionCount; + } + if (LOG.isInfoEnabled()) { + LOG.info("computePartitionCount: Creating " + + partitionCount + ", default would have been " + + (availableWorkerInfos.size() * + availableWorkerInfos.size()) + " partitions."); + } + int maxPartitions = getMaxPartitions(conf); + if (partitionCount > maxPartitions) { + LOG.warn("computePartitionCount: " + + "Reducing the partitionCount to " + maxPartitions + + " from " + partitionCount); + partitionCount = maxPartitions; + } + return partitionCount; + } + + /** + * Get the maximum number of partitions supported by Giraph. + * + * ZooKeeper has a limit of the data in a single znode of 1 MB, + * and we write all partition descriptions to the same znode. + * + * If we are not using checkpointing, each partition owner is serialized + * as 4 ints (16B), and we need some space to write the list of workers + * there. 50k partitions is conservative enough. + * + * When checkpointing is used, we need enough space to write all the + * checkpoint file paths. + * + * @param conf Configuration. + * @return Maximum number of partitions allowed + */ + private static int getMaxPartitions( + ImmutableClassesGiraphConfiguration conf) { + if (conf.useCheckpointing()) { + return 5000; + } else { + return 50000; + } + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java index f4d97a4..b3c63f6 100644 --- a/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java +++ b/giraph-core/src/test/java/org/apache/giraph/io/TestJsonBase64Format.java @@ -21,6 +21,7 @@ import org.apache.giraph.BspCase; import org.apache.giraph.benchmark.EdgeListVertexPageRankBenchmark; import org.apache.giraph.benchmark.PageRankComputation; import org.apache.giraph.conf.GiraphClasses; +import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants; import org.apache.giraph.job.GiraphJob; import org.apache.giraph.io.formats.GiraphFileInputFormat; import org.apache.giraph.io.formats.JsonBase64VertexInputFormat; @@ -66,9 +67,9 @@ public class TestJsonBase64Format extends BspCase { classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class); GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath); job.getConfiguration().setLong( - PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 101); + PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 101); job.getConfiguration().setLong( - PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 2); + PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 2); job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 2); assertTrue(job.run(true)); @@ -91,9 +92,9 @@ public class TestJsonBase64Format extends BspCase { classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class); job = prepareJob(getCallingMethodName(), classes, outputPath3); job.getConfiguration().setLong( - PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 101); + PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 101); job.getConfiguration().setLong( - PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 2); + PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 2); job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 5); assertTrue(job.run(true)); http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java index 2e12bdc..f7fa3f2 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java +++ b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java @@ -19,14 +19,16 @@ package org.apache.giraph; import org.apache.giraph.conf.GiraphClasses; +import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.examples.GeneratedVertexReader; import org.apache.giraph.examples.SimpleCheckpointVertex; import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat; import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat; +import org.apache.giraph.integration.SuperstepHashPartitionerFactory; import org.apache.giraph.job.GiraphJob; import org.apache.giraph.partition.HashRangePartitionerFactory; import org.apache.giraph.partition.PartitionBalancer; -import org.apache.giraph.integration.SuperstepHashPartitionerFactory; +import org.apache.giraph.partition.SimpleLongRangePartitionerFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -42,7 +44,7 @@ import static org.junit.Assert.assertTrue; */ public class TestGraphPartitioner extends BspCase { public TestGraphPartitioner() { - super(TestGraphPartitioner.class.getName()); + super(TestGraphPartitioner.class.getName()); } private void verifyOutput(FileSystem fs, Path outputPath) @@ -50,7 +52,7 @@ public class TestGraphPartitioner extends BspCase { // TODO: this is fragile (breaks with legit serialization changes) final int correctLen = 120; if (runningInDistributedMode()) { - FileStatus [] fileStatusArr = fs.listStatus(outputPath); + FileStatus[] fileStatusArr = fs.listStatus(outputPath); int totalLen = 0; for (FileStatus fileStatus : fileStatusArr) { if (fileStatus.getPath().toString().contains("/part-m-")) { @@ -131,8 +133,10 @@ public class TestGraphPartitioner extends BspCase { SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class); job.getConfiguration().setMasterComputeClass( SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class); - job.getConfiguration().setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class); - job.getConfiguration().setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class); + job.getConfiguration().setVertexInputFormatClass( + SimpleSuperstepVertexInputFormat.class); + job.getConfiguration().setVertexOutputFormatClass( + SimpleSuperstepVertexOutputFormat.class); job.getConfiguration().setGraphPartitionerFactoryClass( HashRangePartitionerFactory.class); outputPath = getTempPath("testHashRangePartitioner"); @@ -158,5 +162,30 @@ public class TestGraphPartitioner extends BspCase { GeneratedVertexReader.REVERSE_ID_ORDER, true); assertTrue(job.run(true)); verifyOutput(hdfs, outputPath); + + job = new GiraphJob("testSimpleRangePartitioner"); + setupConfiguration(job); + job.getConfiguration().setVertexClass( + SimpleCheckpointVertex.SimpleCheckpointComputation.class); + job.getConfiguration().setWorkerContextClass( + SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class); + job.getConfiguration().setMasterComputeClass( + SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class); + job.getConfiguration().setVertexInputFormatClass( + SimpleSuperstepVertexInputFormat.class); + job.getConfiguration().setVertexOutputFormatClass( + SimpleSuperstepVertexOutputFormat.class); + + job.getConfiguration().setGraphPartitionerFactoryClass( + SimpleLongRangePartitionerFactory.class); + long readerVertices = job.getConfiguration().getLong( + GeneratedVertexReader.READER_VERTICES, -1); + job.getConfiguration().setLong( + GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, readerVertices); + + outputPath = getTempPath("testSimpleRangePartitioner"); + removeAndSetOutput(job, outputPath); + assertTrue(job.run(true)); + verifyOutput(hdfs, outputPath); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java b/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java index cdf1f65..41f5e3c 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java +++ b/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java @@ -19,20 +19,20 @@ package org.apache.giraph; import org.apache.giraph.conf.GiraphClasses; -import org.apache.giraph.examples.PartitionContextTestVertex; +import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.examples.GeneratedVertexReader; +import org.apache.giraph.examples.PartitionContextTestVertex; import org.apache.giraph.examples.SimplePageRankVertex; import org.apache.giraph.job.GiraphJob; -import org.apache.giraph.partition.HashMasterPartitioner; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; import org.junit.Test; -import static org.junit.Assert.assertTrue; - import java.io.IOException; +import static org.junit.Assert.assertTrue; + public class TestPartitionContext extends BspCase { public TestPartitionContext() { super(TestPartitionContext.class.getName()); @@ -65,7 +65,7 @@ public class TestPartitionContext extends BspCase { PartitionContextTestVertex.NUM_VERTICES); // Increase the number of partitions job.getConfiguration().setInt( - HashMasterPartitioner.USER_PARTITION_COUNT, + GiraphConstants.USER_PARTITION_COUNT, PartitionContextTestVertex.NUM_PARTITIONS); assertTrue(job.run(true)); } http://git-wip-us.apache.org/repos/asf/giraph/blob/507959dc/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java b/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java index 5e61596..f56d7e5 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java +++ b/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java @@ -20,8 +20,8 @@ package org.apache.giraph.examples; import org.apache.giraph.BspCase; import org.apache.giraph.conf.GiraphClasses; import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.job.GiraphJob; -import org.apache.giraph.partition.HashMasterPartitioner; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; @@ -81,7 +81,7 @@ public class TestPageRank extends BspCase { conf.setNumComputeThreads(numComputeThreads); // Set enough partitions to generate randomness on the compute side if (numComputeThreads != 1) { - conf.setInt(HashMasterPartitioner.USER_PARTITION_COUNT, + conf.setInt(GiraphConstants.USER_PARTITION_COUNT, numComputeThreads * 5); } assertTrue(job.run(true));
