Updated Branches: refs/heads/master c51bcd63a -> 38a97e54c
CRUNCH-219: Allow FileSourceImpl to take in multiple paths Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/38a97e54 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/38a97e54 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/38a97e54 Branch: refs/heads/master Commit: 38a97e54cef99749731a4b88d59b3100cc3b87e9 Parents: c51bcd6 Author: Josh Wills <[email protected]> Authored: Mon Jun 24 21:47:09 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Mon Jun 24 21:47:09 2013 -0700 ---------------------------------------------------------------------- .../crunch/io/avro/AvroMemPipelineIT.java | 35 ++++++++ .../apache/crunch/io/avro/AvroFileSource.java | 12 +-- .../crunch/io/avro/trevni/TrevniKeySource.java | 12 +-- .../apache/crunch/io/impl/FileSourceImpl.java | 84 ++++++++++++++++---- .../crunch/io/impl/FileTableSourceImpl.java | 9 +++ .../org/apache/crunch/io/seq/SeqFileSource.java | 12 +-- .../crunch/io/seq/SeqFileTableSource.java | 13 +-- .../apache/crunch/io/text/NLineFileSource.java | 18 ++++- .../crunch/io/text/TextFileReaderFactory.java | 10 +++ .../apache/crunch/io/text/TextFileSource.java | 11 ++- .../crunch/io/text/TextFileTableSource.java | 18 ++++- 11 files changed, 186 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java index 9cafa3f..cfb669e 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java @@ -18,12 +18,14 @@ package org.apache.crunch.io.avro; import static org.junit.Assert.assertEquals; +import com.google.common.collect.Sets; import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.Collections; import java.util.List; +import java.util.Set; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericRecord; @@ -35,6 +37,7 @@ import org.apache.crunch.test.Person; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.fs.Path; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -118,4 +121,36 @@ public class AvroMemPipelineIT implements Serializable { assertEquals(writeRecord, readRecord.toString()); } + @Test + public void testMemPipelineWithMultiplePaths() { + + GenericRecord writeRecord1 = createGenericRecord("John Doe"); + final PCollection<GenericRecord> writeCollection1 = MemPipeline.collectionOf(Collections.singleton(writeRecord1)); + writeCollection1.write(To.avroFile(avroFile.getAbsolutePath())); + + File avroFile2 = tmpDir.getFile("test2.avro"); + GenericRecord writeRecord2 = createGenericRecord("Jane Doe"); + final PCollection<GenericRecord> writeCollection2 = MemPipeline.collectionOf(Collections.singleton(writeRecord2)); + writeCollection2.write(To.avroFile(avroFile2.getAbsolutePath())); + + List<Path> paths = Lists.newArrayList(new Path(avroFile.getAbsolutePath()), + new Path(avroFile2.getAbsolutePath())); + PCollection<Record> readCollection = MemPipeline.getInstance().read( + new AvroFileSource<Record>(paths, Avros.generics(writeRecord1.getSchema()))); + + Set<Record> readSet = Sets.newHashSet(readCollection.materialize()); + + assertEquals(Sets.newHashSet(writeRecord1, writeRecord2), readSet); + } + + private GenericRecord createGenericRecord(String name) { + + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", name); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy")); + + return savedRecord; + } + } http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java index 15792bf..3e1e933 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java @@ -19,8 +19,8 @@ package org.apache.crunch.io.avro; import java.io.IOException; +import java.util.List; import org.apache.avro.mapred.AvroJob; -import org.apache.crunch.io.CompositePathIterable; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.impl.FileSourceImpl; @@ -28,7 +28,6 @@ import org.apache.crunch.types.avro.AvroInputFormat; import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.Avros; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> { @@ -45,14 +44,17 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSour super(path, ptype, getBundle(ptype)); } + public AvroFileSource(List<Path> paths, AvroType<T> ptype) { + super(paths, ptype, getBundle(ptype)); + } + @Override public String toString() { - return "Avro(" + path.toString() + ")"; + return "Avro(" + pathsAsString() + ")"; } @Override public Iterable<T> read(Configuration conf) throws IOException { - FileSystem fs = path.getFileSystem(conf); - return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>((AvroType<T>) ptype)); + return read(conf, new AvroFileReaderFactory<T>((AvroType<T>) ptype)); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java index 193ac1b..bee7ec1 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java @@ -17,15 +17,14 @@ */ package org.apache.crunch.io.avro.trevni; +import java.util.List; import org.apache.avro.mapred.AvroJob; -import org.apache.crunch.io.CompositePathIterable; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.impl.FileSourceImpl; import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.Avros; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.trevni.avro.mapreduce.AvroTrevniKeyInputFormat; @@ -45,14 +44,17 @@ public class TrevniKeySource<T> extends FileSourceImpl<T> implements ReadableSou super(path, ptype, getBundle(ptype)); } + public TrevniKeySource(List<Path> paths, AvroType<T> ptype) { + super(paths, ptype, getBundle(ptype)); + } + @Override public String toString() { - return "TrevniKey(" + path.toString() + ")"; + return "TrevniKey(" + pathsAsString() + ")"; } @Override public Iterable<T> read(Configuration conf) throws IOException { - FileSystem fs = path.getFileSystem(conf); - return CompositePathIterable.create(fs, path, new TrevniFileReaderFactory<T>((AvroType<T>) ptype)); + return read(conf, new TrevniFileReaderFactory<T>((AvroType<T>) ptype)); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java index 688c801..44139b0 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java @@ -17,17 +17,25 @@ */ package org.apache.crunch.io.impl; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import java.io.IOException; +import java.util.List; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.Source; +import org.apache.crunch.io.CompositePathIterable; import org.apache.crunch.io.CrunchInputs; +import org.apache.crunch.io.FileReaderFactory; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.SourceTargetHelper; +import org.apache.crunch.io.avro.AvroFileReaderFactory; import org.apache.crunch.types.PType; +import org.apache.crunch.types.avro.AvroType; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; @@ -37,34 +45,58 @@ public class FileSourceImpl<T> implements Source<T> { private static final Log LOG = LogFactory.getLog(FileSourceImpl.class); + @Deprecated protected final Path path; + protected final List<Path> paths; protected final PType<T> ptype; protected final FormatBundle<? extends InputFormat> inputBundle; public FileSourceImpl(Path path, PType<T> ptype, Class<? extends InputFormat> inputFormatClass) { - this.path = path; - this.ptype = ptype; - this.inputBundle = FormatBundle.forInput(inputFormatClass); + this(path, ptype, FormatBundle.forInput(inputFormatClass)); } public FileSourceImpl(Path path, PType<T> ptype, FormatBundle<? extends InputFormat> inputBundle) { - this.path = path; + this(Lists.newArrayList(path), ptype, inputBundle); + } + + public FileSourceImpl(List<Path> paths, PType<T> ptype, Class<? extends InputFormat> inputFormatClass) { + this(paths, ptype, FormatBundle.forInput(inputFormatClass)); + } + + public FileSourceImpl(List<Path> paths, PType<T> ptype, FormatBundle<? extends InputFormat> inputBundle) { + this.path = paths.isEmpty() ? null : paths.get(0); + this.paths = paths; this.ptype = ptype; this.inputBundle = inputBundle; } + @Deprecated public Path getPath() { - return path; + if (paths.isEmpty()) { + return null; + } else if (paths.size() > 1) { + LOG.warn("getPath() called for source with multiple paths, only " + + "returning first. Source: " + this); + } + return paths.get(0); + } + + public List<Path> getPaths() { + return paths; } @Override public void configureSource(Job job, int inputId) throws IOException { if (inputId == -1) { - FileInputFormat.addInputPath(job, path); + for (Path path : paths) { + FileInputFormat.addInputPath(job, path); + } job.setInputFormatClass(inputBundle.getFormatClass()); inputBundle.configure(job.getConfiguration()); } else { - CrunchInputs.addInputPath(job, path, inputBundle, inputId); + for (Path path : paths) { + CrunchInputs.addInputPath(job, path, inputBundle, inputId); + } } } @@ -75,12 +107,34 @@ public class FileSourceImpl<T> implements Source<T> { @Override public long getSize(Configuration configuration) { - try { - return SourceTargetHelper.getPathSize(configuration, path); - } catch (IOException e) { - LOG.warn(String.format("Exception thrown looking up size of: %s", path), e); - throw new IllegalStateException("Failed to get the file size of:" + path, e); + long size = 0; + for (Path path : paths) { + try { + size += SourceTargetHelper.getPathSize(configuration, path); + } catch (IOException e) { + LOG.warn(String.format("Exception thrown looking up size of: %s", path), e); + throw new IllegalStateException("Failed to get the file size of:" + path, e); + } + } + return size; + } + + protected Iterable<T> read(Configuration conf, FileReaderFactory<T> readerFactory) + throws IOException { + List<Iterable<T>> iterables = Lists.newArrayList(); + for (Path path : paths) { + FileSystem fs = path.getFileSystem(conf); + iterables.add(CompositePathIterable.create(fs, path, readerFactory)); + } + return Iterables.concat(iterables); + } + + /* Retain string format for single-path sources */ + protected String pathsAsString() { + if (paths.size() == 1) { + return paths.get(0).toString(); } + return paths.toString(); } @Override @@ -89,16 +143,16 @@ public class FileSourceImpl<T> implements Source<T> { return false; } FileSourceImpl o = (FileSourceImpl) other; - return ptype.equals(o.ptype) && path.equals(o.path) && inputBundle.equals(o.inputBundle); + return ptype.equals(o.ptype) && paths.equals(o.paths) && inputBundle.equals(o.inputBundle); } @Override public int hashCode() { - return new HashCodeBuilder().append(ptype).append(path).append(inputBundle).toHashCode(); + return new HashCodeBuilder().append(ptype).append(paths).append(inputBundle).toHashCode(); } @Override public String toString() { - return new StringBuilder().append(inputBundle.getName()).append("(").append(path).append(")").toString(); + return new StringBuilder().append(inputBundle.getName()).append("(").append(pathsAsString()).append(")").toString(); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java index 295edb5..ba313a4 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.io.impl; +import java.util.List; import org.apache.crunch.Pair; import org.apache.crunch.TableSource; import org.apache.crunch.io.FormatBundle; @@ -30,9 +31,17 @@ public class FileTableSourceImpl<K, V> extends FileSourceImpl<Pair<K, V>> implem super(path, tableType, formatClass); } + public FileTableSourceImpl(List<Path> paths, PTableType<K, V> tableType, Class<? extends FileInputFormat> formatClass) { + super(paths, tableType, formatClass); + } + public FileTableSourceImpl(Path path, PTableType<K, V> tableType, FormatBundle bundle) { super(path, tableType, bundle); } + + public FileTableSourceImpl(List<Path> paths, PTableType<K, V> tableType, FormatBundle bundle) { + super(paths, tableType, bundle); + } @Override public PTableType<K, V> getTableType() { http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java index 8fac4ae..9e6edc8 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java @@ -19,12 +19,11 @@ package org.apache.crunch.io.seq; import java.io.IOException; -import org.apache.crunch.io.CompositePathIterable; +import java.util.List; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.impl.FileSourceImpl; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; @@ -34,14 +33,17 @@ public class SeqFileSource<T> extends FileSourceImpl<T> implements ReadableSourc super(path, ptype, SequenceFileInputFormat.class); } + public SeqFileSource(List<Path> paths, PType<T> ptype) { + super(paths, ptype, SequenceFileInputFormat.class); + } + @Override public Iterable<T> read(Configuration conf) throws IOException { - FileSystem fs = path.getFileSystem(conf); - return CompositePathIterable.create(fs, path, new SeqFileReaderFactory<T>(ptype)); + return read(conf, new SeqFileReaderFactory<T>(ptype)); } @Override public String toString() { - return "SeqFile(" + path.toString() + ")"; + return "SeqFile(" + pathsAsString() + ")"; } } http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java index 7a63272..cecafeb 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java @@ -19,13 +19,12 @@ package org.apache.crunch.io.seq; import java.io.IOException; +import java.util.List; import org.apache.crunch.Pair; -import org.apache.crunch.io.CompositePathIterable; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.impl.FileTableSourceImpl; import org.apache.crunch.types.PTableType; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; @@ -43,15 +42,17 @@ public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K, V> implemen super(path, ptype, SequenceFileInputFormat.class); } + public SeqFileTableSource(List<Path> paths, PTableType<K, V> ptype) { + super(paths, ptype, SequenceFileInputFormat.class); + } + @Override public Iterable<Pair<K, V>> read(Configuration conf) throws IOException { - FileSystem fs = path.getFileSystem(conf); - return CompositePathIterable.create(fs, path, - new SeqFileReaderFactory<Pair<K, V>>(getTableType())); + return read(conf, new SeqFileReaderFactory<Pair<K, V>>(getTableType())); } @Override public String toString() { - return "SeqFile(" + path.toString() + ")"; + return "SeqFile(" + pathsAsString() + ")"; } } http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java index 40e2dbd..abef771 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java @@ -19,7 +19,7 @@ package org.apache.crunch.io.text; import java.io.IOException; -import org.apache.crunch.io.CompositePathIterable; +import java.util.List; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.impl.FileSourceImpl; @@ -64,14 +64,24 @@ public class NLineFileSource<T> extends FileSourceImpl<T> implements ReadableSou super(path, ptype, getBundle(linesPerTask)); } + /** + * Create a new {@code NLineFileSource} instance. + * + * @param paths The {@code Path}s to the input data + * @param ptype The PType to use for processing the data + * @param linesPerTask The number of lines from the input each map task will process + */ + public NLineFileSource(List<Path> paths, PType<T> ptype, int linesPerTask) { + super(paths, ptype, getBundle(linesPerTask)); + } + @Override public String toString() { - return "NLine(" + path + ")"; + return "NLine(" + pathsAsString() + ")"; } @Override public Iterable<T> read(Configuration conf) throws IOException { - return CompositePathIterable.create(path.getFileSystem(conf), path, - new TextFileReaderFactory<T>(LineParser.forType(ptype))); + return read(conf, new TextFileReaderFactory<T>(LineParser.forType(ptype))); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java index e1fea6e..851d199 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java @@ -62,11 +62,17 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> { final BufferedReader reader = new BufferedReader(new InputStreamReader(is)); return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() { + boolean nextChecked = false; private String nextLine; @Override public boolean hasNext() { + if (nextChecked) { + return nextLine != null; + } + try { + nextChecked = true; return (nextLine = reader.readLine()) != null; } catch (IOException e) { LOG.info("Exception reading text file stream", e); @@ -76,6 +82,10 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> { @Override public T next() { + if (!nextChecked && !hasNext()) { + return null; + } + nextChecked = false; return parser.parse(nextLine); } }); http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java index 026fca9..ca8cbaf 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java @@ -19,7 +19,7 @@ package org.apache.crunch.io.text; import java.io.IOException; -import org.apache.crunch.io.CompositePathIterable; +import java.util.List; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.impl.FileSourceImpl; import org.apache.crunch.types.PType; @@ -51,6 +51,10 @@ public class TextFileSource<T> extends FileSourceImpl<T> implements ReadableSour super(path, ptype, getInputFormat(path, ptype)); } + public TextFileSource(List<Path> paths, PType<T> ptype) { + super(paths, ptype, getInputFormat(paths.get(0), ptype)); + } + @Override public long getSize(Configuration conf) { long sz = super.getSize(conf); @@ -62,12 +66,11 @@ public class TextFileSource<T> extends FileSourceImpl<T> implements ReadableSour @Override public String toString() { - return "Text(" + path + ")"; + return "Text(" + pathsAsString() + ")"; } @Override public Iterable<T> read(Configuration conf) throws IOException { - return CompositePathIterable.create(path.getFileSystem(conf), path, - new TextFileReaderFactory<T>(LineParser.forType(ptype))); + return read(conf, new TextFileReaderFactory<T>(LineParser.forType(ptype))); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java index 94fc5fd..66b2e67 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java @@ -19,8 +19,8 @@ package org.apache.crunch.io.text; import java.io.IOException; +import java.util.List; import org.apache.crunch.Pair; -import org.apache.crunch.io.CompositePathIterable; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.impl.FileTableSourceImpl; @@ -58,6 +58,10 @@ public class TextFileTableSource<K, V> extends FileTableSourceImpl<K, V> public TextFileTableSource(Path path, PTableType<K, V> tableType) { this(path, tableType, "\t"); } + + public TextFileTableSource(List<Path> paths, PTableType<K, V> tableType) { + this(paths, tableType, "\t"); + } public TextFileTableSource(String path, PTableType<K, V> tableType, String separator) { this(new Path(path), tableType, separator); @@ -68,14 +72,20 @@ public class TextFileTableSource<K, V> extends FileTableSourceImpl<K, V> this.separator = separator; } + public TextFileTableSource(List<Path> paths, PTableType<K, V> tableType, String separator) { + super(paths, tableType, getBundle(separator)); + this.separator = separator; + } + @Override public String toString() { - return "KeyValueText(" + path + ")"; + return "KeyValueText(" + pathsAsString() + ")"; } @Override public Iterable<Pair<K, V>> read(Configuration conf) throws IOException { - return CompositePathIterable.create(path.getFileSystem(conf), path, - new TextFileReaderFactory<Pair<K, V>>(LineParser.forTableType(getTableType(), separator))); + return read(conf, + new TextFileReaderFactory<Pair<K, V>>(LineParser.forTableType(getTableType(), + separator))); } }
