Updated Branches:
  refs/heads/apache-crunch-0.8 1552bc67a -> dfb0402f7

CRUNCH-337: Add From methods to support multiple input paths


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/dfb0402f
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/dfb0402f
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/dfb0402f

Branch: refs/heads/apache-crunch-0.8
Commit: dfb0402f79499bec335c03c05fc4804ba9eba9df
Parents: 1552bc6
Author: Josh Wills <[email protected]>
Authored: Tue Feb 4 07:27:19 2014 -0800
Committer: Josh Wills <[email protected]>
Committed: Tue Feb 4 16:47:31 2014 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/io/From.java    | 163 ++++++++++++++++++-
 1 file changed, 160 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/dfb0402f/crunch-core/src/main/java/org/apache/crunch/io/From.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/From.java 
b/crunch-core/src/main/java/org/apache/crunch/io/From.java
index 0f5d6e0..14793a6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/From.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/From.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
 import java.io.IOException;
+import java.util.List;
 
 /**
  * <p>Static factory methods for creating common {@link Source} types.</p>
@@ -114,6 +115,24 @@ public class From {
 
   /**
    * Creates a {@code TableSource<K, V>} for reading data from files that have 
custom
+   * {@code FileInputFormat<K, V>} implementations not covered by the provided 
{@code TableSource}
+   * and {@code Source} factory methods.
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param formatClass The {@code FileInputFormat} implementation
+   * @param keyClass The {@code Writable} to use for the key
+   * @param valueClass The {@code Writable} to use for the value
+   * @return A new {@code TableSource<K, V>} instance
+   */
+  public static <K extends Writable, V extends Writable> TableSource<K, V> 
formattedFile(
+      List<Path> paths, Class<? extends FileInputFormat<K, V>> formatClass,
+      Class<K> keyClass, Class<V> valueClass) {
+    return formattedFile(paths, formatClass, Writables.writables(keyClass),
+        Writables.writables(valueClass));
+  }
+
+  /**
+   * Creates a {@code TableSource<K, V>} for reading data from files that have 
custom
    * {@code FileInputFormat} implementations not covered by the provided 
{@code TableSource}
    * and {@code Source} factory methods.
    * 
@@ -148,6 +167,24 @@ public class From {
   }
 
   /**
+   * Creates a {@code TableSource<K, V>} for reading data from files that have 
custom
+   * {@code FileInputFormat} implementations not covered by the provided 
{@code TableSource}
+   * and {@code Source} factory methods.
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param formatClass The {@code FileInputFormat} implementation
+   * @param keyType The {@code PType} to use for the key
+   * @param valueType The {@code PType} to use for the value
+   * @return A new {@code TableSource<K, V>} instance
+   */
+  public static <K, V> TableSource<K, V> formattedFile(List<Path> paths,
+                                                       Class<? extends 
FileInputFormat<?, ?>> formatClass,
+                                                       PType<K> keyType, 
PType<V> valueType) {
+    PTableType<K, V> tableType = keyType.getFamily().tableOf(keyType, 
valueType);
+    return new FileTableSourceImpl<K, V>(paths, tableType, formatClass);
+  }
+
+  /**
    * Creates a {@code Source<T>} instance from the Avro file(s) at the given 
path name.
    * 
    * @param pathName The name of the path to the data on the filesystem
@@ -168,7 +205,18 @@ public class From {
   public static <T extends SpecificRecord> Source<T> avroFile(Path path, 
Class<T> avroClass) {
     return avroFile(path, Avros.specifics(avroClass));  
   }
-  
+
+  /**
+   * Creates a {@code Source<T>} instance from the Avro file(s) at the given 
{@code Path}s.
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param avroClass The subclass of {@code SpecificRecord} to use for the 
Avro file
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T extends SpecificRecord> Source<T> avroFile(List<Path> 
paths, Class<T> avroClass) {
+    return avroFile(paths, Avros.specifics(avroClass));
+  }
+
   /**
    * Creates a {@code Source<T>} instance from the Avro file(s) at the given 
path name.
    * 
@@ -192,6 +240,17 @@ public class From {
   }
 
   /**
+   * Creates a {@code Source<T>} instance from the Avro file(s) at the given 
{@code Path}s.
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param avroType The {@code AvroType} for the Avro records
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T> Source<T> avroFile(List<Path> paths, AvroType<T> avroType) 
{
+    return new AvroFileSource<T>(paths, avroType);
+  }
+
+  /**
    * Creates a {@code Source<GenericData.Record>} by reading the schema of the 
Avro file
    * at the given path. If the path is a directory, the schema of a file in 
the directory
    * will be used to determine the schema to use.
@@ -217,6 +276,18 @@ public class From {
 
   /**
    * Creates a {@code Source<GenericData.Record>} by reading the schema of the 
Avro file
+   * at the given paths. If the path is a directory, the schema of a file in 
the directory
+   * will be used to determine the schema to use.
+   *
+   * @param paths A list of paths to the data on the filesystem
+   * @return A new {@code Source<GenericData.Record>} instance
+   */
+  public static Source<GenericData.Record> avroFile(List<Path> paths) {
+    return avroFile(paths, new Configuration());
+  }
+
+  /**
+   * Creates a {@code Source<GenericData.Record>} by reading the schema of the 
Avro file
    * at the given path using the {@code FileSystem} information contained in 
the given
    * {@code Configuration} instance. If the path is a directory, the schema of 
a file in
    * the directory will be used to determine the schema to use.
@@ -229,6 +300,20 @@ public class From {
     return avroFile(path, Avros.generics(getSchemaFromPath(path, conf)));
   }
 
+  /**
+   * Creates a {@code Source<GenericData.Record>} by reading the schema of the 
Avro file
+   * at the given paths using the {@code FileSystem} information contained in 
the given
+   * {@code Configuration} instance. If the first path is a directory, the 
schema of a file in
+   * the directory will be used to determine the schema to use.
+   *
+   * @param paths The path to the data on the filesystem
+   * @param conf The configuration information
+   * @return A new {@code Source<GenericData.Record>} instance
+   */
+  public static Source<GenericData.Record> avroFile(List<Path> paths, 
Configuration conf) {
+    return avroFile(paths, Avros.generics(getSchemaFromPath(paths.get(0), 
conf)));
+  }
+
   static Schema getSchemaFromPath(Path path, Configuration conf) {
     DataFileReader reader = null;
     try {
@@ -284,7 +369,19 @@ public class From {
   public static <T extends Writable> Source<T> sequenceFile(Path path, 
Class<T> valueClass) {
     return sequenceFile(path, Writables.writables(valueClass));
   }
-  
+
+  /**
+   * Creates a {@code Source<T>} instance from the SequenceFile(s) at the 
given {@code Path}s
+   * from the value field of each key-value pair in the SequenceFile(s).
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param valueClass The {@code Writable} type for the value of the 
SequenceFile entry
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T extends Writable> Source<T> sequenceFile(List<Path> paths, 
Class<T> valueClass) {
+    return sequenceFile(paths, Writables.writables(valueClass));
+  }
+
   /**
    * Creates a {@code Source<T>} instance from the SequenceFile(s) at the 
given path name
    * from the value field of each key-value pair in the SequenceFile(s).
@@ -310,6 +407,18 @@ public class From {
   }
 
   /**
+   * Creates a {@code Source<T>} instance from the SequenceFile(s) at the 
given {@code Path}s
+   * from the value field of each key-value pair in the SequenceFile(s).
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param ptype The {@code PType} for the value of the SequenceFile entry
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T> Source<T> sequenceFile(List<Path> paths, PType<T> ptype) {
+    return new SeqFileSource<T>(paths, ptype);
+  }
+
+  /**
    * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at 
the given path name.
    * 
    * @param pathName The name of the path to the data on the filesystem
@@ -334,7 +443,20 @@ public class From {
       Path path, Class<K> keyClass, Class<V> valueClass) {
     return sequenceFile(path, Writables.writables(keyClass), 
Writables.writables(valueClass));
   }
-  
+
+  /**
+   * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at 
the given {@code Path}s.
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param keyClass The {@code Writable} subclass for the key of the 
SequenceFile entry
+   * @param valueClass The {@code Writable} subclass for the value of the 
SequenceFile entry
+   * @return A new {@code SourceTable<K, V>} instance
+   */
+  public static <K extends Writable, V extends Writable> TableSource<K, V> 
sequenceFile(
+      List<Path> paths, Class<K> keyClass, Class<V> valueClass) {
+    return sequenceFile(paths, Writables.writables(keyClass), 
Writables.writables(valueClass));
+  }
+
   /**
    * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at 
the given path name.
    * 
@@ -361,6 +483,19 @@ public class From {
   }
 
   /**
+   * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at 
the given {@code Path}s.
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param keyType The {@code PType} for the key of the SequenceFile entry
+   * @param valueType The {@code PType} for the value of the SequenceFile entry
+   * @return A new {@code SourceTable<K, V>} instance
+   */
+  public static <K, V> TableSource<K, V> sequenceFile(List<Path> paths, 
PType<K> keyType, PType<V> valueType) {
+    PTypeFamily ptf = keyType.getFamily();
+    return new SeqFileTableSource<K, V>(paths, ptf.tableOf(keyType, 
valueType));
+  }
+
+  /**
    * Creates a {@code Source<String>} instance for the text file(s) at the 
given path name.
    * 
    * @param pathName The name of the path to the data on the filesystem
@@ -381,6 +516,16 @@ public class From {
   }
 
   /**
+   * Creates a {@code Source<String>} instance for the text file(s) at the 
given {@code Path}s.
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @return A new {@code Source<String>} instance
+   */
+  public static Source<String> textFile(List<Path> paths) {
+    return textFile(paths, Writables.strings());
+  }
+
+  /**
    * Creates a {@code Source<T>} instance for the text file(s) at the given 
path name using
    * the provided {@code PType<T>} to convert the input text.
    * 
@@ -403,4 +548,16 @@ public class From {
   public static <T> Source<T> textFile(Path path, PType<T> ptype) {
     return new TextFileSource<T>(path, ptype);
   }
+
+  /**
+   * Creates a {@code Source<T>} instance for the text file(s) at the given 
{@code Path}s using
+   * the provided {@code PType<T>} to convert the input text.
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param ptype The {@code PType<T>} to use to process the input text
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T> Source<T> textFile(List<Path> paths, PType<T> ptype) {
+    return new TextFileSource<T>(paths, ptype);
+  }
 }

Reply via email to