Repository: giraph Updated Branches: refs/heads/trunk 92a8f1ca1 -> f93031877
GIRAPH-1004 Summary: Currently we fix to BspOutputFormat, but for some more fancy things we need to be able to change it and use different commit method. Test Plan: mvn clean verify, run a job that changes output format Reviewers: ikabiljo Differential Revision: https://reviews.facebook.net/D37917 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f9303187 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f9303187 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f9303187 Branch: refs/heads/trunk Commit: f930318772845ea1f121ad3b3e593513239a3e4b Parents: 92a8f1c Author: Maja Kabiljo <[email protected]> Authored: Thu Apr 30 16:23:07 2015 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Thu Apr 30 16:32:29 2015 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 ++ .../main/java/org/apache/giraph/conf/GiraphConstants.java | 8 +++++++- .../src/main/java/org/apache/giraph/job/GiraphJob.java | 4 ++-- .../test/java/org/apache/giraph/comm/TestMessageStores.java | 4 ++-- 4 files changed, 13 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/f9303187/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 30704e8..442ade3 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.2.0 - unreleased + GIRAPH-1004: Allow changing hadoop output format (majakabiljo) + GIRAPH-1002: Improve message changing through iters (ikabiljo via edunov) GIRAPH-998: Close writers in parallel (majaakbiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/f9303187/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 2805c26..2c938a9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import org.apache.giraph.aggregators.AggregatorWriter; import org.apache.giraph.aggregators.TextAggregatorWriter; +import org.apache.giraph.bsp.BspOutputFormat; import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker; import org.apache.giraph.bsp.checkpoints.DefaultCheckpointSupportedChecker; import org.apache.giraph.combiner.MessageCombiner; @@ -79,6 +80,7 @@ import org.apache.giraph.worker.WorkerContext; import org.apache.giraph.worker.WorkerObserver; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.OutputFormat; /** * Constants used all over Giraph for configuration. @@ -1184,6 +1186,10 @@ public interface GiraphConstants { new IntConfOption("giraph.async.message.store.threads", 0, "Number of threads to be used in async message store."); - + /** Output format class for hadoop to use (for committing) */ + ClassConfOption<OutputFormat> HADOOP_OUTPUT_FORMAT_CLASS = + ClassConfOption.create("giraph.hadoopOutputFormatClass", + BspOutputFormat.class, OutputFormat.class, + "Output format class for hadoop to use (for committing)"); } // CHECKSTYLE: resume InterfaceIsTypeCheck http://git-wip-us.apache.org/repos/asf/giraph/blob/f9303187/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java index 79a145f..62894b6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java @@ -19,7 +19,6 @@ package org.apache.giraph.job; import org.apache.giraph.bsp.BspInputFormat; -import org.apache.giraph.bsp.BspOutputFormat; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; @@ -249,7 +248,8 @@ public class GiraphJob { submittedJob.setNumReduceTasks(0); submittedJob.setMapperClass(GraphMapper.class); submittedJob.setInputFormatClass(BspInputFormat.class); - submittedJob.setOutputFormatClass(BspOutputFormat.class); + submittedJob.setOutputFormatClass( + GiraphConstants.HADOOP_OUTPUT_FORMAT_CLASS.get(conf)); if (jobProgressTrackerService != null) { jobProgressTrackerService.setJob(submittedJob); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f9303187/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java index bf20580..e9f5f92 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java @@ -224,7 +224,7 @@ public class TestMessageStores { } out.close(); - messageStore = messageStoreFactory.newStore( + messageStore = (S) messageStoreFactory.newStore( new DefaultMessageClasses( IntWritable.class, DefaultMessageValueFactory.class, @@ -249,7 +249,7 @@ public class TestMessageStores { TestData testData) throws IOException { SortedMap<IntWritable, Collection<IntWritable>> messages = new TreeMap<IntWritable, Collection<IntWritable>>(); - S messageStore = messageStoreFactory.newStore( + S messageStore = (S) messageStoreFactory.newStore( new DefaultMessageClasses( IntWritable.class, DefaultMessageValueFactory.class,
