Updated Branches: refs/heads/master ffd8375a2 -> 715128b93
CRUNCH-337: Add From methods to support multiple input paths Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/715128b9 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/715128b9 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/715128b9 Branch: refs/heads/master Commit: 715128b93044eb7e3c375e2d680be9da4ee3a2c1 Parents: ffd8375 Author: Josh Wills <[email protected]> Authored: Tue Feb 4 07:27:19 2014 -0800 Committer: Josh Wills <[email protected]> Committed: Tue Feb 4 07:27:19 2014 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/io/From.java | 163 ++++++++++++++++++- 1 file changed, 160 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/715128b9/crunch-core/src/main/java/org/apache/crunch/io/From.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/From.java b/crunch-core/src/main/java/org/apache/crunch/io/From.java index 0f5d6e0..14793a6 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/From.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/From.java @@ -46,6 +46,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; +import java.util.List; /** * <p>Static factory methods for creating common {@link Source} types.</p> @@ -114,6 +115,24 @@ public class From { /** * Creates a {@code TableSource<K, V>} for reading data from files that have custom + * {@code FileInputFormat<K, V>} implementations not covered by the provided {@code TableSource} + * and {@code Source} factory methods. + * + * @param paths A list of {@code Path}s to the data + * @param formatClass The {@code FileInputFormat} implementation + * @param keyClass The {@code Writable} to use for the key + * @param valueClass The {@code Writable} to use for the value + * @return A new {@code TableSource<K, V>} instance + */ + public static <K extends Writable, V extends Writable> TableSource<K, V> formattedFile( + List<Path> paths, Class<? extends FileInputFormat<K, V>> formatClass, + Class<K> keyClass, Class<V> valueClass) { + return formattedFile(paths, formatClass, Writables.writables(keyClass), + Writables.writables(valueClass)); + } + + /** + * Creates a {@code TableSource<K, V>} for reading data from files that have custom * {@code FileInputFormat} implementations not covered by the provided {@code TableSource} * and {@code Source} factory methods. * @@ -148,6 +167,24 @@ public class From { } /** + * Creates a {@code TableSource<K, V>} for reading data from files that have custom + * {@code FileInputFormat} implementations not covered by the provided {@code TableSource} + * and {@code Source} factory methods. + * + * @param paths A list of {@code Path}s to the data + * @param formatClass The {@code FileInputFormat} implementation + * @param keyType The {@code PType} to use for the key + * @param valueType The {@code PType} to use for the value + * @return A new {@code TableSource<K, V>} instance + */ + public static <K, V> TableSource<K, V> formattedFile(List<Path> paths, + Class<? extends FileInputFormat<?, ?>> formatClass, + PType<K> keyType, PType<V> valueType) { + PTableType<K, V> tableType = keyType.getFamily().tableOf(keyType, valueType); + return new FileTableSourceImpl<K, V>(paths, tableType, formatClass); + } + + /** * Creates a {@code Source<T>} instance from the Avro file(s) at the given path name. * * @param pathName The name of the path to the data on the filesystem @@ -168,7 +205,18 @@ public class From { public static <T extends SpecificRecord> Source<T> avroFile(Path path, Class<T> avroClass) { return avroFile(path, Avros.specifics(avroClass)); } - + + /** + * Creates a {@code Source<T>} instance from the Avro file(s) at the given {@code Path}s. + * + * @param paths A list of {@code Path}s to the data + * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file + * @return A new {@code Source<T>} instance + */ + public static <T extends SpecificRecord> Source<T> avroFile(List<Path> paths, Class<T> avroClass) { + return avroFile(paths, Avros.specifics(avroClass)); + } + /** * Creates a {@code Source<T>} instance from the Avro file(s) at the given path name. * @@ -192,6 +240,17 @@ public class From { } /** + * Creates a {@code Source<T>} instance from the Avro file(s) at the given {@code Path}s. + * + * @param paths A list of {@code Path}s to the data + * @param avroType The {@code AvroType} for the Avro records + * @return A new {@code Source<T>} instance + */ + public static <T> Source<T> avroFile(List<Path> paths, AvroType<T> avroType) { + return new AvroFileSource<T>(paths, avroType); + } + + /** * Creates a {@code Source<GenericData.Record>} by reading the schema of the Avro file * at the given path. If the path is a directory, the schema of a file in the directory * will be used to determine the schema to use. @@ -217,6 +276,18 @@ public class From { /** * Creates a {@code Source<GenericData.Record>} by reading the schema of the Avro file + * at the given paths. If the path is a directory, the schema of a file in the directory + * will be used to determine the schema to use. + * + * @param paths A list of paths to the data on the filesystem + * @return A new {@code Source<GenericData.Record>} instance + */ + public static Source<GenericData.Record> avroFile(List<Path> paths) { + return avroFile(paths, new Configuration()); + } + + /** + * Creates a {@code Source<GenericData.Record>} by reading the schema of the Avro file * at the given path using the {@code FileSystem} information contained in the given * {@code Configuration} instance. If the path is a directory, the schema of a file in * the directory will be used to determine the schema to use. @@ -229,6 +300,20 @@ public class From { return avroFile(path, Avros.generics(getSchemaFromPath(path, conf))); } + /** + * Creates a {@code Source<GenericData.Record>} by reading the schema of the Avro file + * at the given paths using the {@code FileSystem} information contained in the given + * {@code Configuration} instance. If the first path is a directory, the schema of a file in + * the directory will be used to determine the schema to use. + * + * @param paths The path to the data on the filesystem + * @param conf The configuration information + * @return A new {@code Source<GenericData.Record>} instance + */ + public static Source<GenericData.Record> avroFile(List<Path> paths, Configuration conf) { + return avroFile(paths, Avros.generics(getSchemaFromPath(paths.get(0), conf))); + } + static Schema getSchemaFromPath(Path path, Configuration conf) { DataFileReader reader = null; try { @@ -284,7 +369,19 @@ public class From { public static <T extends Writable> Source<T> sequenceFile(Path path, Class<T> valueClass) { return sequenceFile(path, Writables.writables(valueClass)); } - + + /** + * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given {@code Path}s + * from the value field of each key-value pair in the SequenceFile(s). + * + * @param paths A list of {@code Path}s to the data + * @param valueClass The {@code Writable} type for the value of the SequenceFile entry + * @return A new {@code Source<T>} instance + */ + public static <T extends Writable> Source<T> sequenceFile(List<Path> paths, Class<T> valueClass) { + return sequenceFile(paths, Writables.writables(valueClass)); + } + /** * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given path name * from the value field of each key-value pair in the SequenceFile(s). @@ -310,6 +407,18 @@ public class From { } /** + * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given {@code Path}s + * from the value field of each key-value pair in the SequenceFile(s). + * + * @param paths A list of {@code Path}s to the data + * @param ptype The {@code PType} for the value of the SequenceFile entry + * @return A new {@code Source<T>} instance + */ + public static <T> Source<T> sequenceFile(List<Path> paths, PType<T> ptype) { + return new SeqFileSource<T>(paths, ptype); + } + + /** * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given path name. * * @param pathName The name of the path to the data on the filesystem @@ -334,7 +443,20 @@ public class From { Path path, Class<K> keyClass, Class<V> valueClass) { return sequenceFile(path, Writables.writables(keyClass), Writables.writables(valueClass)); } - + + /** + * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given {@code Path}s. + * + * @param paths A list of {@code Path}s to the data + * @param keyClass The {@code Writable} subclass for the key of the SequenceFile entry + * @param valueClass The {@code Writable} subclass for the value of the SequenceFile entry + * @return A new {@code SourceTable<K, V>} instance + */ + public static <K extends Writable, V extends Writable> TableSource<K, V> sequenceFile( + List<Path> paths, Class<K> keyClass, Class<V> valueClass) { + return sequenceFile(paths, Writables.writables(keyClass), Writables.writables(valueClass)); + } + /** * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given path name. * @@ -361,6 +483,19 @@ public class From { } /** + * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given {@code Path}s. + * + * @param paths A list of {@code Path}s to the data + * @param keyType The {@code PType} for the key of the SequenceFile entry + * @param valueType The {@code PType} for the value of the SequenceFile entry + * @return A new {@code SourceTable<K, V>} instance + */ + public static <K, V> TableSource<K, V> sequenceFile(List<Path> paths, PType<K> keyType, PType<V> valueType) { + PTypeFamily ptf = keyType.getFamily(); + return new SeqFileTableSource<K, V>(paths, ptf.tableOf(keyType, valueType)); + } + + /** * Creates a {@code Source<String>} instance for the text file(s) at the given path name. * * @param pathName The name of the path to the data on the filesystem @@ -381,6 +516,16 @@ public class From { } /** + * Creates a {@code Source<String>} instance for the text file(s) at the given {@code Path}s. + * + * @param paths A list of {@code Path}s to the data + * @return A new {@code Source<String>} instance + */ + public static Source<String> textFile(List<Path> paths) { + return textFile(paths, Writables.strings()); + } + + /** * Creates a {@code Source<T>} instance for the text file(s) at the given path name using * the provided {@code PType<T>} to convert the input text. * @@ -403,4 +548,16 @@ public class From { public static <T> Source<T> textFile(Path path, PType<T> ptype) { return new TextFileSource<T>(path, ptype); } + + /** + * Creates a {@code Source<T>} instance for the text file(s) at the given {@code Path}s using + * the provided {@code PType<T>} to convert the input text. + * + * @param paths A list of {@code Path}s to the data + * @param ptype The {@code PType<T>} to use to process the input text + * @return A new {@code Source<T>} instance + */ + public static <T> Source<T> textFile(List<Path> paths, PType<T> ptype) { + return new TextFileSource<T>(paths, ptype); + } }
