Repository: giraph Updated Branches: refs/heads/trunk 8a952155d -> 43909035c
Added support for multi-mapping input formats Summary: https://issues.apache.org/jira/browse/GIRAPH-999 Similarly to multi-vertex/edge input formats, I added multi-mapping input formats. The main changes are the addition of a MultiMappingInputFormat and a MappingInputFormatDecription. The rest are some set/get methods in the configuration-related classes. Test Plan: Run: - mvn clean verify - Run job with 2 mapping input formats (replicated the same mapping input) Reviewers: sergey.edunov, ikabiljo, maja.kabiljo Reviewed By: maja.kabiljo Subscribers: dionysis.logothetis Differential Revision: https://reviews.facebook.net/D35733 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/43909035 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/43909035 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/43909035 Branch: refs/heads/trunk Commit: 43909035cc6cf860958d2ea6527433025eda0f17 Parents: 8a95215 Author: Dionysios Logothetis <[email protected]> Authored: Tue Mar 24 15:28:46 2015 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Tue Mar 24 15:33:08 2015 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../org/apache/giraph/conf/GiraphClasses.java | 13 ++ .../apache/giraph/conf/GiraphConfiguration.java | 11 ++ .../ImmutableClassesGiraphConfiguration.java | 12 ++ .../multi/MappingInputFormatDescription.java | 163 +++++++++++++++++++ .../formats/multi/MultiMappingInputFormat.java | 136 ++++++++++++++++ 6 files changed, 337 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/43909035/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 9c8481b..d2e74e2 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.2.0 - unreleased + GIRAPH-999: Add support for Mapping multi-input formats (dlogothetis via majakabiljo) + GIRAPH-997: Upgrade findbugs to 3.0.0 (dlogothetis via majakabiljo) GIRAPH-996: Large requests degrade performance. Print out warnings. (dlogothetis via majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/43909035/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java index 7fe85f1..2f3c43a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java @@ -643,6 +643,19 @@ public class GiraphClasses<I extends WritableComparable, } /** + * Set MappingInputFormat held + * + * @param mappingInputFormatClass MappingInputFormat to set + * @return this + */ + public GiraphClasses setMappingInputFormatClass( + Class<? extends MappingInputFormat<I, V, E, Writable>> + mappingInputFormatClass) { + this.mappingInputFormatClass = mappingInputFormatClass; + return this; + } + + /** * Set VertexInputFormat held * * @param vertexInputFormatClass VertexInputFormat to set http://git-wip-us.apache.org/repos/asf/giraph/blob/43909035/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 8c64b5d..a315399 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -31,6 +31,7 @@ import org.apache.giraph.factories.VertexValueFactory; import org.apache.giraph.graph.Computation; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.EdgeOutputFormat; +import org.apache.giraph.io.MappingInputFormat; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.filters.EdgeInputFilter; @@ -271,6 +272,16 @@ public class GiraphConfiguration extends Configuration } /** + * Set the mapping input format class (optional) + * + * @param mappingInputFormatClass Determines how mappings are input + */ + public void setMappingInputFormatClass( + Class<? extends MappingInputFormat> mappingInputFormatClass) { + MAPPING_INPUT_FORMAT_CLASS.set(this, mappingInputFormatClass); + } + + /** * Set the master class (optional) * * @param masterComputeClass Runs master computation http://git-wip-us.apache.org/repos/asf/giraph/blob/43909035/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index 8b67490..381495e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -323,6 +323,18 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, } /** + * Set MappingInputFormatClass + * + * @param mappingInputFormatClass Determines how mappings are input + */ + @Override + public void setMappingInputFormatClass( + Class<? extends MappingInputFormat> mappingInputFormatClass) { + super.setMappingInputFormatClass(mappingInputFormatClass); + classes.setMappingInputFormatClass(mappingInputFormatClass); + } + + /** * Check if mappingInputFormat is set * * @return true if mappingInputFormat is set http://git-wip-us.apache.org/repos/asf/giraph/blob/43909035/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MappingInputFormatDescription.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MappingInputFormatDescription.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MappingInputFormatDescription.java new file mode 100644 index 0000000..6a3ba91 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MappingInputFormatDescription.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.formats.multi; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.StrConfOption; +import org.apache.giraph.io.MappingInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.json.JSONArray; +import org.json.JSONException; + +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * Description of the mapping input format - holds mapping input format class + * and all parameters specifically set for that mapping input format. + * + * Used only with {@link MultiMappingInputFormat} + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param <B> Mapping target + */ +public class MappingInputFormatDescription<I extends WritableComparable, + V extends Writable, E extends Writable, B extends Writable> + extends InputFormatDescription<MappingInputFormat<I, V, E, B>> { + /** + * MappingInputFormats description - JSON array containing a JSON array for + * each mapping input. Mapping input JSON arrays contain one or two elements - + * first one is the name of mapping input class, and second one is JSON object + * with all specific parameters for this mapping input. For example: + * [["VIF1",{"p":"v1"}],["VIF2",{"p":"v2","q":"v"}]] + */ + public static final StrConfOption MAPPING_INPUT_FORMAT_DESCRIPTIONS = + new StrConfOption("giraph.multiMappingInput.descriptions", null, + "MappingInputFormats description - JSON array containing a JSON " + + "array for each mapping input. Mapping input JSON arrays contain " + + "one or two elements - first one is the name of mapping input " + + "class, and second one is JSON object with all specific parameters " + + "for this mapping input. For example: [[\"VIF1\",{\"p\":\"v1\"}]," + + "[\"VIF2\",{\"p\":\"v2\",\"q\":\"v\"}]]\""); + + /** + * Constructor with mapping input format class + * + * @param mappingInputFormatClass Mapping input format class + */ + public MappingInputFormatDescription( + Class<? extends MappingInputFormat<I, V, E, B>> mappingInputFormatClass + ) { + super(mappingInputFormatClass); + } + + /** + * Constructor with json string describing this input format + * + * @param description Json string describing this input format + */ + public MappingInputFormatDescription(String description) { + super(description); + } + + /** + * Create a copy of configuration which additionally has all parameters for + * this input format set + * + * @param conf Configuration which we want to create a copy from + * @return Copy of configuration + */ + private ImmutableClassesGiraphConfiguration<I, V, E> + createConfigurationCopy( + ImmutableClassesGiraphConfiguration<I, V, E> conf) { + ImmutableClassesGiraphConfiguration<I, V, E> confCopy = + new ImmutableClassesGiraphConfiguration<I, V, E>(conf); + confCopy.setMappingInputFormatClass(getInputFormatClass()); + putParametersToConfiguration(confCopy); + return confCopy; + } + + /** + * Get descriptions of mapping input formats from configuration. + * + * @param conf Configuration + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param <B> Mapping target + * @return List of mapping input format descriptions + */ + public static <I extends WritableComparable, V extends Writable, + E extends Writable, B extends Writable> + List<MappingInputFormatDescription<I, V, E, B>> + getMappingInputFormatDescriptions(Configuration conf) { + String mappingInputFormatDescriptions = + MAPPING_INPUT_FORMAT_DESCRIPTIONS.get(conf); + if (mappingInputFormatDescriptions == null) { + return Lists.newArrayList(); + } + try { + JSONArray inputFormatsJson = new JSONArray( + mappingInputFormatDescriptions); + List<MappingInputFormatDescription<I, V, E, B>> descriptions = + Lists.newArrayListWithCapacity(inputFormatsJson.length()); + for (int i = 0; i < inputFormatsJson.length(); i++) { + descriptions.add(new MappingInputFormatDescription<I, V, E, B>( + inputFormatsJson.getString(i))); + } + return descriptions; + } catch (JSONException e) { + throw new IllegalStateException("getMappingInputFormatDescriptions: " + + "JSONException occurred while trying to process " + + mappingInputFormatDescriptions, e); + } + } + + /** + * Create all mapping input formats + * + * @param conf Configuration + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param <B> Mapping target data + * @return List with all mapping input formats + */ + public static <I extends WritableComparable, V extends Writable, + E extends Writable, B extends Writable> + List<MappingInputFormat<I, V, E, B>> createMappingInputFormats( + ImmutableClassesGiraphConfiguration<I, V, E> conf) { + List<MappingInputFormatDescription<I, V, E, B>> descriptions = + getMappingInputFormatDescriptions(conf); + List<MappingInputFormat<I, V, E, B>> mappingInputFormats = + Lists.newArrayListWithCapacity(descriptions.size()); + for (MappingInputFormatDescription<I, V, E, B> description : descriptions) { + ImmutableClassesGiraphConfiguration<I, V, E> confCopy = + description.createConfigurationCopy(conf); + mappingInputFormats.add((MappingInputFormat<I, V, E, B>) + confCopy.createWrappedMappingInputFormat()); + } + return mappingInputFormats; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/43909035/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiMappingInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiMappingInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiMappingInputFormat.java new file mode 100644 index 0000000..5d943db --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiMappingInputFormat.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.formats.multi; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.io.MappingInputFormat; +import org.apache.giraph.io.MappingReader; +import org.apache.giraph.io.internal.WrappedMappingReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +/** + * Mapping input format which wraps several mapping input formats. + * Provides the way to read data from multiple sources, + * using several different input formats. + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge data + * @param <B> Mapping target + */ +public class MultiMappingInputFormat<I extends WritableComparable, + V extends Writable, E extends Writable, B extends Writable> + extends MappingInputFormat<I, V, E, B> { + + /** Mapping input formats */ + private List<MappingInputFormat<I, V, E, B>> mappingInputFormats; + + /** + * Default constructor. + */ + public MultiMappingInputFormat() { + } + + @Override + public void setConf( + ImmutableClassesGiraphConfiguration<I, V, E> conf) { + super.setConf(conf); + mappingInputFormats = + MappingInputFormatDescription.createMappingInputFormats(getConf()); + if (mappingInputFormats.isEmpty()) { + throw new IllegalStateException("setConf: Using MultiVertexInputFormat " + + "without specifying vertex inputs"); + } + } + + @Override + public MappingReader createMappingReader( + InputSplit inputSplit, TaskAttemptContext context + ) throws IOException { + if (inputSplit instanceof InputSplitWithInputFormatIndex) { + // When multithreaded input is used we need to make sure other threads + // don't change context's configuration while we use it + synchronized (context) { + InputSplitWithInputFormatIndex split = + (InputSplitWithInputFormatIndex) inputSplit; + MappingInputFormat<I, V, E, B> mappingInputFormat = + mappingInputFormats.get(split.getInputFormatIndex()); + MappingReader<I, V, E, B> mappingReader = + mappingInputFormat.createMappingReader(split.getSplit(), context); + return new WrappedMappingReader<I, V, E, B>( + mappingReader, mappingInputFormat.getConf()) { + @Override + public void initialize(InputSplit inputSplit, + TaskAttemptContext context) throws IOException, + InterruptedException { + // When multithreaded input is used we need to make sure other + // threads don't change context's configuration while we use it + synchronized (context) { + super.initialize(inputSplit, context); + } + } + }; + } + } else { + throw new IllegalStateException("createVertexReader: Got InputSplit " + + "which was not created by this class: " + + inputSplit.getClass().getName()); + } + } + + @Override + public void checkInputSpecs(Configuration conf) { + for (MappingInputFormat mappingInputFormat : mappingInputFormats) { + mappingInputFormat.checkInputSpecs(conf); + } + } + + @Override + public List<InputSplit> getSplits( + JobContext context, int minSplitCountHint + ) throws IOException, InterruptedException { + synchronized (context) { + return MultiInputUtils.getSplits( + context, minSplitCountHint, mappingInputFormats); + } + } + + @Override + public void writeInputSplit(InputSplit inputSplit, DataOutput dataOutput) + throws IOException { + MultiInputUtils.writeInputSplit( + inputSplit, dataOutput, mappingInputFormats); + } + + @Override + public InputSplit readInputSplit( + DataInput dataInput) throws IOException, ClassNotFoundException { + return MultiInputUtils.readInputSplit(dataInput, mappingInputFormats); + } +}
