Updated Branches: refs/heads/trunk 39c9add15 -> 2347627d5
GIRAPH-530: GiraphInputFormat#getSplits() should be aware of multithreaded input (apresta) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/2347627d Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/2347627d Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/2347627d Branch: refs/heads/trunk Commit: 2347627d5993f9c3ed9e26f244fac6e48f18a2a0 Parents: 39c9add Author: Alessandro Presta <[email protected]> Authored: Fri Feb 22 16:07:55 2013 -0800 Committer: Alessandro Presta <[email protected]> Committed: Mon Feb 25 09:51:47 2013 -0800 ---------------------------------------------------------------------- .../io/accumulo/AccumuloVertexInputFormat.java | 11 +------- .../java/org/apache/giraph/io/EdgeInputFormat.java | 20 +------------- .../org/apache/giraph/io/GiraphInputFormat.java | 4 +- .../org/apache/giraph/io/VertexInputFormat.java | 20 +------------- .../io/formats/GeneratedVertexInputFormat.java | 6 ++-- .../io/formats/PseudoRandomEdgeInputFormat.java | 6 ++-- .../io/formats/PseudoRandomVertexInputFormat.java | 6 ++-- .../io/formats/SequenceFileVertexInputFormat.java | 2 +- .../giraph/io/formats/TextEdgeInputFormat.java | 2 +- .../giraph/io/formats/TextVertexInputFormat.java | 3 +- .../io/formats/TextVertexValueInputFormat.java | 2 +- .../org/apache/giraph/master/BspServiceMaster.java | 22 +++++++++------ .../giraph/io/hbase/HBaseVertexInputFormat.java | 12 +------- .../io/hcatalog/HCatalogEdgeInputFormat.java | 3 +- .../io/hcatalog/HCatalogVertexInputFormat.java | 2 +- .../hcatalog/HCatalogVertexValueInputFormat.java | 2 +- .../hive/input/edge/HiveEdgeInputFormat.java | 2 +- .../hive/input/vertex/HiveVertexInputFormat.java | 2 +- 18 files changed, 39 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java b/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java index c1b632e..9445fd4 100644 --- a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java +++ b/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java @@ -150,18 +150,9 @@ public abstract class AccumuloVertexInputFormat< } - /** - * getSplits - * - * @param context Context of the job - * @param numWorkers Number of workers used for this job - * @return tablet splits - * @throws IOException - * @throws InterruptedException - */ @Override public List<InputSplit> getSplits( - JobContext context, int numWorkers) + JobContext context, int minSplitCountHint) throws IOException, InterruptedException { List<InputSplit> splits = null; try { http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java index 0d5c43f..87ea5a0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeInputFormat.java @@ -35,27 +35,9 @@ import java.util.List; */ public abstract class EdgeInputFormat<I extends WritableComparable, E extends Writable> implements GiraphInputFormat { - /** - * Logically split the vertices for a graph processing application. - * - * Each {@link InputSplit} is then assigned to a worker for processing. - * - * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the - * input files are not physically split into chunks. For e.g. a split could - * be <i><input-file-path, start, offset></i> tuple. The InputFormat - * also creates the {@link VertexReader} to read the {@link InputSplit}. - * - * Also, the number of workers is a hint given to the developer to try to - * intelligently determine how many splits to create (if this is - * adjustable) at runtime. - * - * @param context Context of the job - * @param numWorkers Number of workers used for this job - * @return an array of {@link InputSplit}s for the job. - */ @Override public abstract List<InputSplit> getSplits( - JobContext context, int numWorkers) throws IOException, + JobContext context, int minSplitCountHint) throws IOException, InterruptedException; /** http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java index ca725ca..6b175a2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java @@ -32,11 +32,11 @@ public interface GiraphInputFormat { * Get the list of input splits for the format. * * @param context The job context - * @param numWorkers Number of workers + * @param minSplitCountHint Minimum number of splits to create (hint) * @return The list of input splits * @throws IOException * @throws InterruptedException */ - List<InputSplit> getSplits(JobContext context, int numWorkers) + List<InputSplit> getSplits(JobContext context, int minSplitCountHint) throws IOException, InterruptedException; } http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java index 0f73b8d..ded8b92 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java @@ -42,27 +42,9 @@ import java.util.List; public abstract class VertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> implements GiraphInputFormat { - /** - * Logically split the vertices for a graph processing application. - * - * Each {@link InputSplit} is then assigned to a worker for processing. - * - * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the - * input files are not physically split into chunks. For e.g. a split could - * be <i><input-file-path, start, offset></i> tuple. The InputFormat - * also creates the {@link VertexReader} to read the {@link InputSplit}. - * - * Also, the number of workers is a hint given to the developer to try to - * intelligently determine how many splits to create (if this is - * adjustable) at runtime. - * - * @param context Context of the job - * @param numWorkers Number of workers used for this job - * @return an array of {@link InputSplit}s for the job. - */ @Override public abstract List<InputSplit> getSplits( - JobContext context, int numWorkers) + JobContext context, int minSplitCountHint) throws IOException, InterruptedException; /** http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java index f308169..1ae125f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java @@ -43,13 +43,13 @@ public abstract class GeneratedVertexInputFormat< I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends VertexInputFormat<I, V, E, M> { @Override - public List<InputSplit> getSplits(JobContext context, int numWorkers) + public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) throws IOException, InterruptedException { // This is meaningless, the VertexReader will generate all the test // data. List<InputSplit> inputSplitList = new ArrayList<InputSplit>(); - for (int i = 0; i < numWorkers; ++i) { - inputSplitList.add(new BspInputSplit(i, numWorkers)); + for (int i = 0; i < minSplitCountHint; ++i) { + inputSplitList.add(new BspInputSplit(i, minSplitCountHint)); } return inputSplitList; } http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java index 90f814c..d197925 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java @@ -55,13 +55,13 @@ public class PseudoRandomEdgeInputFormat @Override public final List<InputSplit> getSplits(final JobContext context, - final int numWorkers) + final int minSplitCountHint) throws IOException, InterruptedException { // This is meaningless, the PseudoRandomEdgeReader will generate // all the test data List<InputSplit> inputSplitList = new ArrayList<InputSplit>(); - for (int i = 0; i < numWorkers; ++i) { - inputSplitList.add(new BspInputSplit(i, numWorkers)); + for (int i = 0; i < minSplitCountHint; ++i) { + inputSplitList.add(new BspInputSplit(i, minSplitCountHint)); } return inputSplitList; } http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java index f2a2c93..19bc3b8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java @@ -61,12 +61,12 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends @Override public final List<InputSplit> getSplits(final JobContext context, - final int numWorkers) throws IOException, InterruptedException { + final int minSplitCountHint) throws IOException, InterruptedException { // This is meaningless, the PseudoRandomVertexReader will generate // all the test data List<InputSplit> inputSplitList = new ArrayList<InputSplit>(); - for (int i = 0; i < numWorkers; ++i) { - inputSplitList.add(new BspInputSplit(i, numWorkers)); + for (int i = 0; i < minSplitCountHint; ++i) { + inputSplitList.add(new BspInputSplit(i, minSplitCountHint)); } return inputSplitList; } http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java index 9428b87..6a5813b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java @@ -50,7 +50,7 @@ public class SequenceFileVertexInputFormat<I extends WritableComparable, new SequenceFileInputFormat<I, X>(); @Override - public List<InputSplit> getSplits(JobContext context, int numWorkers) + public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) throws IOException, InterruptedException { return sequenceFileInputFormat.getSplits(context); } http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java index 6e59e7f..c9f5df1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java @@ -51,7 +51,7 @@ public abstract class TextEdgeInputFormat<I extends WritableComparable, @Override public List<InputSplit> getSplits( - JobContext context, int numWorkers) throws IOException, + JobContext context, int minSplitCountHint) throws IOException, InterruptedException { // Ignore the hint of numWorkers here since we are using // GiraphTextInputFormat to do this for us http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java index af68300..e359f66 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java @@ -48,12 +48,11 @@ import java.util.List; public abstract class TextVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends VertexInputFormat<I, V, E, M> { - /** Uses the GiraphTextInputFormat to do everything */ protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat(); @Override - public List<InputSplit> getSplits(JobContext context, int numWorkers) + public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) throws IOException, InterruptedException { // Ignore the hint of numWorkers here since we are using // GiraphTextInputFormat to do this for us http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java index 4e607c2..e09f6a3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java @@ -49,7 +49,7 @@ public abstract class TextVertexValueInputFormat<I extends WritableComparable, protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat(); @Override - public List<InputSplit> getSplits(JobContext context, int numWorkers) + public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) throws IOException, InterruptedException { // Ignore the hint of numWorkers here since we are using // GiraphTextInputFormat to do this for us http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 10a0afd..b84e21b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -268,17 +268,17 @@ public class BspServiceMaster<I extends WritableComparable, * Common method for generating vertex/edge input splits. * * @param inputFormat The vertex/edge input format - * @param numWorkers Number of available workers + * @param minSplitCountHint Minimum number of splits to create (hint) * @param inputSplitType Type of input splits (for logging purposes) * @return List of input splits for the given format */ private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat, - int numWorkers, + int minSplitCountHint, String inputSplitType) { String logPrefix = "generate" + inputSplitType + "InputSplits"; List<InputSplit> splits; try { - splits = inputFormat.getSplits(getContext(), numWorkers); + splits = inputFormat.getSplits(getContext(), minSplitCountHint); } catch (IOException e) { throw new IllegalStateException(logPrefix + ": Got IOException", e); } catch (InterruptedException e) { @@ -300,7 +300,7 @@ public class BspServiceMaster<I extends WritableComparable, } else { if (LOG.isInfoEnabled()) { LOG.info(logPrefix + ": Got " + splits.size() + - " input splits for " + numWorkers + " workers"); + " input splits for " + minSplitCountHint + " workers"); } return splits; } @@ -576,10 +576,14 @@ public class BspServiceMaster<I extends WritableComparable, return -1; } + // Create at least as many splits as the total number of input threads. + int minSplitCountHint = healthyWorkerInfoList.size() * + getConfiguration().getNumInputSplitsThreads(); + // Note that the input splits may only be a sample if // INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100 List<InputSplit> splitList = generateInputSplits(inputFormat, - healthyWorkerInfoList.size(), inputSplitType); + minSplitCountHint, inputSplitType); if (splitList.isEmpty()) { LOG.fatal(logPrefix + ": Failing job due to 0 input splits, " + @@ -588,12 +592,12 @@ public class BspServiceMaster<I extends WritableComparable, "check input of " + inputFormat.getClass().getName() + "!"); failJob(); } - if (healthyWorkerInfoList.size() > splitList.size()) { + if (minSplitCountHint > splitList.size()) { LOG.warn(logPrefix + ": Number of inputSplits=" + splitList.size() + " < " + - healthyWorkerInfoList.size() + - "=number of healthy processes, " + - "some workers will be not used"); + minSplitCountHint + + "=total number of input threads, " + + "some threads will be not used"); } // Write input splits to zookeeper in parallel http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-hbase/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hbase/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java b/giraph-hbase/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java index c16489a..bbcbc1b 100644 --- a/giraph-hbase/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java +++ b/giraph-hbase/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java @@ -175,17 +175,9 @@ public abstract class HBaseVertexInputFormat< } - /** - * getSplits - * - * @param context Context of the job - * @param numWorkers Number of workers used for this job - * @return HBase region splits - * @throws IOException - * @throws InterruptedException - */ + @Override public List<InputSplit> getSplits( - JobContext context, int numWorkers) + JobContext context, int minSplitCountHint) throws IOException, InterruptedException { BASE_FORMAT.setConf(context.getConfiguration()); return BASE_FORMAT.getSplits(context); http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java index 8d85056..d8987da 100644 --- a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java +++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java @@ -50,7 +50,8 @@ public abstract class HCatalogEdgeInputFormat< private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat(); @Override - public final List<InputSplit> getSplits(JobContext context, int numWorkers) + public final List<InputSplit> getSplits(JobContext context, + int minSplitCountHint) throws IOException, InterruptedException { return hCatInputFormat.getEdgeSplits(context); } http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java index e1d9791..9d52b64 100644 --- a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java +++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java @@ -73,7 +73,7 @@ public abstract class HCatalogVertexInputFormat< @Override public final List<InputSplit> getSplits( - final JobContext context, final int numWorkers) + final JobContext context, final int minSplitCountHint) throws IOException, InterruptedException { return hCatInputFormat.getVertexSplits(context); } http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java index b3934e4..f365578 100644 --- a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java +++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexValueInputFormat.java @@ -52,7 +52,7 @@ public abstract class HCatalogVertexValueInputFormat<I extends private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat(); @Override - public List<InputSplit> getSplits(JobContext context, int numWorkers) + public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) throws IOException, InterruptedException { return hCatInputFormat.getVertexSplits(context); } http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java index 17405c8..3f40763 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java @@ -55,7 +55,7 @@ public class HiveEdgeInputFormat<I extends WritableComparable, } @Override - public List<InputSplit> getSplits(JobContext context, int numWorkers) + public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) throws IOException, InterruptedException { return hiveInputFormat.getSplits(context); } http://git-wip-us.apache.org/repos/asf/giraph/blob/2347627d/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java index 1d43055..fb3b123 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java @@ -58,7 +58,7 @@ public class HiveVertexInputFormat<I extends WritableComparable, } @Override - public List<InputSplit> getSplits(JobContext context, int numWorkers) + public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) throws IOException, InterruptedException { return hiveInputFormat.getSplits(context); }
