Repository: giraph
Updated Branches:
  refs/heads/trunk 8a952155d -> 43909035c


Added support for multi-mapping input formats

Summary:
https://issues.apache.org/jira/browse/GIRAPH-999

Similarly to multi-vertex/edge input formats, I added multi-mapping input 
formats. The main changes are the addition of a MultiMappingInputFormat and a 
MappingInputFormatDecription. The rest are some set/get methods in the 
configuration-related classes.

Test Plan:
Run:
- mvn clean verify
- Run job with 2 mapping input formats (replicated the same mapping input)

Reviewers: sergey.edunov, ikabiljo, maja.kabiljo

Reviewed By: maja.kabiljo

Subscribers: dionysis.logothetis

Differential Revision: https://reviews.facebook.net/D35733


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/43909035
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/43909035
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/43909035

Branch: refs/heads/trunk
Commit: 43909035cc6cf860958d2ea6527433025eda0f17
Parents: 8a95215
Author: Dionysios Logothetis <[email protected]>
Authored: Tue Mar 24 15:28:46 2015 -0700
Committer: Maja Kabiljo <[email protected]>
Committed: Tue Mar 24 15:33:08 2015 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../org/apache/giraph/conf/GiraphClasses.java   |  13 ++
 .../apache/giraph/conf/GiraphConfiguration.java |  11 ++
 .../ImmutableClassesGiraphConfiguration.java    |  12 ++
 .../multi/MappingInputFormatDescription.java    | 163 +++++++++++++++++++
 .../formats/multi/MultiMappingInputFormat.java  | 136 ++++++++++++++++
 6 files changed, 337 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/43909035/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 9c8481b..d2e74e2 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.2.0 - unreleased
+  GIRAPH-999: Add support for Mapping multi-input formats (dlogothetis via 
majakabiljo)
+
   GIRAPH-997: Upgrade findbugs to 3.0.0 (dlogothetis via majakabiljo)
 
   GIRAPH-996: Large requests degrade performance. Print out warnings. 
(dlogothetis via majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/43909035/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 7fe85f1..2f3c43a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -643,6 +643,19 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
+   * Set MappingInputFormat held
+   *
+   * @param mappingInputFormatClass MappingInputFormat to set
+   * @return this
+   */
+  public GiraphClasses setMappingInputFormatClass(
+    Class<? extends MappingInputFormat<I, V, E, Writable>>
+      mappingInputFormatClass) {
+    this.mappingInputFormatClass = mappingInputFormatClass;
+    return this;
+  }
+
+  /**
    * Set VertexInputFormat held
    *
    * @param vertexInputFormatClass VertexInputFormat to set

http://git-wip-us.apache.org/repos/asf/giraph/blob/43909035/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 8c64b5d..a315399 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -31,6 +31,7 @@ import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.filters.EdgeInputFilter;
@@ -271,6 +272,16 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Set the mapping input format class (optional)
+   *
+   * @param mappingInputFormatClass Determines how mappings are input
+   */
+  public void setMappingInputFormatClass(
+    Class<? extends MappingInputFormat> mappingInputFormatClass) {
+    MAPPING_INPUT_FORMAT_CLASS.set(this, mappingInputFormatClass);
+  }
+
+  /**
    * Set the master class (optional)
    *
    * @param masterComputeClass Runs master computation

http://git-wip-us.apache.org/repos/asf/giraph/blob/43909035/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
 
b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 8b67490..381495e 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -323,6 +323,18 @@ public class ImmutableClassesGiraphConfiguration<I extends 
WritableComparable,
   }
 
   /**
+   * Set MappingInputFormatClass
+   *
+   * @param mappingInputFormatClass Determines how mappings are input
+   */
+  @Override
+  public void setMappingInputFormatClass(
+    Class<? extends MappingInputFormat> mappingInputFormatClass) {
+    super.setMappingInputFormatClass(mappingInputFormatClass);
+    classes.setMappingInputFormatClass(mappingInputFormatClass);
+  }
+
+  /**
    * Check if mappingInputFormat is set
    *
    * @return true if mappingInputFormat is set

http://git-wip-us.apache.org/repos/asf/giraph/blob/43909035/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MappingInputFormatDescription.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MappingInputFormatDescription.java
 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MappingInputFormatDescription.java
new file mode 100644
index 0000000..6a3ba91
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MappingInputFormatDescription.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.io.MappingInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.json.JSONArray;
+import org.json.JSONException;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Description of the mapping input format - holds mapping input format class
+ * and all parameters specifically set for that mapping input format.
+ *
+ * Used only with {@link MultiMappingInputFormat}
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <B> Mapping target
+ */
+public class MappingInputFormatDescription<I extends WritableComparable,
+    V extends Writable, E extends Writable, B extends Writable>
+    extends InputFormatDescription<MappingInputFormat<I, V, E, B>> {
+  /**
+   * MappingInputFormats description - JSON array containing a JSON array for
+   * each mapping input. Mapping input JSON arrays contain one or two elements 
-
+   * first one is the name of mapping input class, and second one is JSON 
object
+   * with all specific parameters for this mapping input. For example:
+   * [["VIF1",{"p":"v1"}],["VIF2",{"p":"v2","q":"v"}]]
+   */
+  public static final StrConfOption MAPPING_INPUT_FORMAT_DESCRIPTIONS =
+      new StrConfOption("giraph.multiMappingInput.descriptions", null,
+          "MappingInputFormats description - JSON array containing a JSON " +
+          "array for each mapping input. Mapping input JSON arrays contain " +
+          "one or two elements - first one is the name of mapping input " +
+          "class, and second one is JSON object with all specific parameters " 
+
+          "for this mapping input. For example: [[\"VIF1\",{\"p\":\"v1\"}]," +
+          "[\"VIF2\",{\"p\":\"v2\",\"q\":\"v\"}]]\"");
+
+  /**
+   * Constructor with mapping input format class
+   *
+   * @param mappingInputFormatClass Mapping input format class
+   */
+  public MappingInputFormatDescription(
+    Class<? extends MappingInputFormat<I, V, E, B>> mappingInputFormatClass
+  ) {
+    super(mappingInputFormatClass);
+  }
+
+  /**
+   * Constructor with json string describing this input format
+   *
+   * @param description Json string describing this input format
+   */
+  public MappingInputFormatDescription(String description) {
+    super(description);
+  }
+
+  /**
+   * Create a copy of configuration which additionally has all parameters for
+   * this input format set
+   *
+   * @param conf Configuration which we want to create a copy from
+   * @return Copy of configuration
+   */
+  private ImmutableClassesGiraphConfiguration<I, V, E>
+  createConfigurationCopy(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    ImmutableClassesGiraphConfiguration<I, V, E> confCopy =
+        new ImmutableClassesGiraphConfiguration<I, V, E>(conf);
+    confCopy.setMappingInputFormatClass(getInputFormatClass());
+    putParametersToConfiguration(confCopy);
+    return confCopy;
+  }
+
+  /**
+   * Get descriptions of mapping input formats from configuration.
+   *
+   * @param conf Configuration
+   * @param <I>  Vertex id
+   * @param <V>  Vertex data
+   * @param <E>  Edge data
+   * @param <B>  Mapping target
+   * @return List of mapping input format descriptions
+   */
+  public static <I extends WritableComparable, V extends Writable,
+      E extends Writable, B extends Writable>
+  List<MappingInputFormatDescription<I, V, E, B>>
+  getMappingInputFormatDescriptions(Configuration conf) {
+    String mappingInputFormatDescriptions =
+        MAPPING_INPUT_FORMAT_DESCRIPTIONS.get(conf);
+    if (mappingInputFormatDescriptions == null) {
+      return Lists.newArrayList();
+    }
+    try {
+      JSONArray inputFormatsJson = new JSONArray(
+        mappingInputFormatDescriptions);
+      List<MappingInputFormatDescription<I, V, E, B>> descriptions =
+          Lists.newArrayListWithCapacity(inputFormatsJson.length());
+      for (int i = 0; i < inputFormatsJson.length(); i++) {
+        descriptions.add(new MappingInputFormatDescription<I, V, E, B>(
+            inputFormatsJson.getString(i)));
+      }
+      return descriptions;
+    } catch (JSONException e) {
+      throw new IllegalStateException("getMappingInputFormatDescriptions: " +
+          "JSONException occurred while trying to process " +
+          mappingInputFormatDescriptions, e);
+    }
+  }
+
+  /**
+   * Create all mapping input formats
+   *
+   * @param conf Configuration
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <B> Mapping target data
+   * @return List with all mapping input formats
+   */
+  public static <I extends WritableComparable, V extends Writable,
+      E extends Writable, B extends Writable>
+  List<MappingInputFormat<I, V, E, B>> createMappingInputFormats(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    List<MappingInputFormatDescription<I, V, E, B>> descriptions =
+        getMappingInputFormatDescriptions(conf);
+    List<MappingInputFormat<I, V, E, B>> mappingInputFormats =
+        Lists.newArrayListWithCapacity(descriptions.size());
+    for (MappingInputFormatDescription<I, V, E, B> description : descriptions) 
{
+      ImmutableClassesGiraphConfiguration<I, V, E> confCopy =
+          description.createConfigurationCopy(conf);
+      mappingInputFormats.add((MappingInputFormat<I, V, E, B>)
+                                confCopy.createWrappedMappingInputFormat());
+    }
+    return mappingInputFormats;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/43909035/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiMappingInputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiMappingInputFormat.java
 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiMappingInputFormat.java
new file mode 100644
index 0000000..5d943db
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiMappingInputFormat.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats.multi;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.MappingInputFormat;
+import org.apache.giraph.io.MappingReader;
+import org.apache.giraph.io.internal.WrappedMappingReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Mapping input format which wraps several mapping input formats.
+ * Provides the way to read data from multiple sources,
+ * using several different input formats.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge data
+ * @param <B> Mapping target
+ */
+public class MultiMappingInputFormat<I extends WritableComparable,
+  V extends Writable, E extends Writable, B extends Writable>
+  extends MappingInputFormat<I, V, E, B> {
+
+  /** Mapping input formats */
+  private List<MappingInputFormat<I, V, E, B>> mappingInputFormats;
+
+  /**
+   * Default constructor.
+   */
+  public MultiMappingInputFormat() {
+  }
+
+  @Override
+  public void setConf(
+    ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    super.setConf(conf);
+    mappingInputFormats =
+      MappingInputFormatDescription.createMappingInputFormats(getConf());
+    if (mappingInputFormats.isEmpty()) {
+      throw new IllegalStateException("setConf: Using MultiVertexInputFormat " 
+
+                                        "without specifying vertex inputs");
+    }
+  }
+
+  @Override
+  public MappingReader createMappingReader(
+    InputSplit inputSplit, TaskAttemptContext context
+  ) throws IOException {
+    if (inputSplit instanceof InputSplitWithInputFormatIndex) {
+      // When multithreaded input is used we need to make sure other threads
+      // don't change context's configuration while we use it
+      synchronized (context) {
+        InputSplitWithInputFormatIndex split =
+          (InputSplitWithInputFormatIndex) inputSplit;
+        MappingInputFormat<I, V, E, B> mappingInputFormat =
+          mappingInputFormats.get(split.getInputFormatIndex());
+        MappingReader<I, V, E, B> mappingReader =
+          mappingInputFormat.createMappingReader(split.getSplit(), context);
+        return new WrappedMappingReader<I, V, E, B>(
+          mappingReader, mappingInputFormat.getConf()) {
+          @Override
+          public void initialize(InputSplit inputSplit,
+                                 TaskAttemptContext context) throws 
IOException,
+            InterruptedException {
+            // When multithreaded input is used we need to make sure other
+            // threads don't change context's configuration while we use it
+            synchronized (context) {
+              super.initialize(inputSplit, context);
+            }
+          }
+        };
+      }
+    } else {
+      throw new IllegalStateException("createVertexReader: Got InputSplit " +
+        "which was not created by this class: " +
+        inputSplit.getClass().getName());
+    }
+  }
+
+  @Override
+  public void checkInputSpecs(Configuration conf) {
+    for (MappingInputFormat mappingInputFormat : mappingInputFormats) {
+      mappingInputFormat.checkInputSpecs(conf);
+    }
+  }
+
+  @Override
+  public List<InputSplit> getSplits(
+    JobContext context, int minSplitCountHint
+  ) throws IOException, InterruptedException {
+    synchronized (context) {
+      return MultiInputUtils.getSplits(
+        context, minSplitCountHint, mappingInputFormats);
+    }
+  }
+
+  @Override
+  public void writeInputSplit(InputSplit inputSplit, DataOutput dataOutput)
+    throws IOException {
+    MultiInputUtils.writeInputSplit(
+      inputSplit, dataOutput, mappingInputFormats);
+  }
+
+  @Override
+  public InputSplit readInputSplit(
+    DataInput dataInput) throws IOException, ClassNotFoundException {
+    return MultiInputUtils.readInputSplit(dataInput, mappingInputFormats);
+  }
+}

Reply via email to