Updated Branches: refs/heads/master f26a7c731 -> 63050d0d4
CRUNCH-119: Refactor the ReaderFactory classes for text and sequence files, and add in the NLine and TextFileTable sources Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/63050d0d Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/63050d0d Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/63050d0d Branch: refs/heads/master Commit: 63050d0d49afc9d53bc3948270c49acbf337f3d9 Parents: f26a7c7 Author: Josh Wills <[email protected]> Authored: Wed Nov 21 16:49:02 2012 -0800 Committer: Josh Wills <[email protected]> Committed: Mon Dec 3 11:14:35 2012 -0800 ---------------------------------------------------------------------- .../apache/crunch/io/CompositePathIterableIT.java | 6 +- .../it/java/org/apache/crunch/io/NLineInputIT.java | 69 ++++++++ .../java/org/apache/crunch/io/TextFileTableIT.java | 56 +++++++ .../java/org/apache/crunch/io/ReadableSource.java | 14 ++ .../crunch/io/avro/AvroFileReaderFactory.java | 5 +- .../org/apache/crunch/io/avro/AvroFileSource.java | 13 +- .../apache/crunch/io/impl/AutoClosingIterator.java | 2 +- .../apache/crunch/io/impl/FileTableSourceImpl.java | 4 + .../apache/crunch/io/seq/SeqFileReaderFactory.java | 25 ++- .../org/apache/crunch/io/seq/SeqFileSource.java | 2 +- .../crunch/io/seq/SeqFileTableReaderFactory.java | 99 ------------ .../apache/crunch/io/seq/SeqFileTableSource.java | 6 +- .../java/org/apache/crunch/io/text/LineParser.java | 125 +++++++++++++++ .../org/apache/crunch/io/text/NLineFileSource.java | 77 +++++++++ .../crunch/io/text/TextFileReaderFactory.java | 32 +--- .../org/apache/crunch/io/text/TextFileSource.java | 4 +- .../apache/crunch/io/text/TextFileTableSource.java | 76 +++++++++ .../crunch/io/text/TextFileTableSourceTarget.java | 63 ++++++++ .../org/apache/crunch/io/text/TextFileTarget.java | 2 +- .../crunch/io/avro/AvroFileReaderFactoryTest.java | 2 +- 20 files changed, 532 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java b/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java index 796b821..08d226d 100644 --- a/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java +++ b/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java @@ -48,7 +48,7 @@ public class CompositePathIterableIT { LocalFileSystem local = FileSystem.getLocal(conf); Iterable<String> iterable = CompositePathIterable.create(local, new Path(inputFilePath), - new TextFileReaderFactory<String>(Writables.strings(), conf)); + new TextFileReaderFactory<String>(Writables.strings())); assertEquals(Lists.newArrayList("b", "c", "a", "e"), Lists.newArrayList(iterable)); @@ -62,7 +62,7 @@ public class CompositePathIterableIT { LocalFileSystem local = FileSystem.getLocal(conf); Iterable<String> iterable = CompositePathIterable.create(local, emptyInputDir, - new TextFileReaderFactory<String>(Writables.strings(), conf)); + new TextFileReaderFactory<String>(Writables.strings())); assertTrue(Lists.newArrayList(iterable).isEmpty()); } @@ -78,7 +78,7 @@ public class CompositePathIterableIT { LocalFileSystem local = FileSystem.getLocal(conf); CompositePathIterable.create(local, new Path(nonExistentDir.getAbsolutePath()), new TextFileReaderFactory<String>( - Writables.strings(), conf)); + Writables.strings())); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java b/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java new file mode 100644 index 0000000..3b7abf6 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/io/NLineInputIT.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io; + +import static org.junit.Assert.assertEquals; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.text.NLineFileSource; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.writable.Writables; +import org.apache.crunch.types.avro.Avros; +import org.junit.Rule; +import org.junit.Test; + +public class NLineInputIT { + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testNLine() throws Exception { + String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt"); + Pipeline pipeline = new MRPipeline(NLineInputIT.class, tmpDir.getDefaultConfiguration()); + PCollection<String> shakespeare = pipeline.read(new NLineFileSource<String>(shakesInputPath, + Writables.strings(), 100)); + assertEquals(new Integer(100), + shakespeare.parallelDo(new LineCountFn(), Avros.ints()).max().getValue()); + } + + private static class LineCountFn extends DoFn<String, Integer> { + + private int lineCount = 0; + + @Override + public void initialize() { + this.lineCount = 0; + } + + @Override + public void process(String input, Emitter<Integer> emitter) { + lineCount++; + } + + @Override + public void cleanup(Emitter<Integer> emitter) { + emitter.emit(lineCount); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java b/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java new file mode 100644 index 0000000..bddc0b5 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/io/TextFileTableIT.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io; + +import static org.apache.crunch.types.writable.Writables.*; +import static org.junit.Assert.assertEquals; + +import java.util.Set; + +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.text.TextFileTableSource; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.ImmutableSet; + +/** + * + */ +public class TextFileTableIT { + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testTextFileTable() throws Exception { + String urlsFile = tmpDir.copyResourceFileName("urls.txt"); + Pipeline pipeline = new MRPipeline(TextFileTableIT.class, tmpDir.getDefaultConfiguration()); + PTable<String, String> urls = pipeline.read( + new TextFileTableSource<String, String>(urlsFile, tableOf(strings(), strings()))); + Set<Pair<String, Long>> cnts = ImmutableSet.copyOf(urls.keys().count().materialize()); + assertEquals(ImmutableSet.of(Pair.of("www.A.com", 4L), Pair.of("www.B.com", 2L), + Pair.of("www.C.com", 1L), Pair.of("www.D.com", 1L), Pair.of("www.E.com", 1L), + Pair.of("www.F.com", 2L)), cnts); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java b/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java index 73a13a3..0407167 100644 --- a/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java +++ b/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java @@ -22,6 +22,20 @@ import java.io.IOException; import org.apache.crunch.Source; import org.apache.hadoop.conf.Configuration; +/** + * An extension of the {@code Source} interface that indicates that a + * {@code Source} instance may be read as a series of records by the client + * code. This is used to determine whether a {@code PCollection} instance can be + * materialized. + */ public interface ReadableSource<T> extends Source<T> { + + /** + * Returns an {@code Iterable} that contains the contents of this source. + * + * @param conf The current {@code Configuration} instance + * @return the contents of this {@code Source} as an {@code Iterable} instance + * @throws IOException + */ Iterable<T> read(Configuration conf) throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java index c8ab8b8..2f8c1e3 100644 --- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java +++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java @@ -33,7 +33,6 @@ import org.apache.crunch.io.FileReaderFactory; import org.apache.crunch.io.impl.AutoClosingIterator; import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.Avros; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -46,12 +45,10 @@ class AvroFileReaderFactory<T> implements FileReaderFactory<T> { private final DatumReader<T> recordReader; private final MapFn<T, T> mapFn; - private final Configuration conf; - public AvroFileReaderFactory(AvroType<T> atype, Configuration conf) { + public AvroFileReaderFactory(AvroType<T> atype) { this.recordReader = AvroFileReaderFactory.createDatumReader(atype); this.mapFn = (MapFn<T, T>) atype.getInputMapFn(); - this.conf = conf; } static <T> DatumReader<T> createDatumReader(AvroType<T> avroType) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java index 2226556..32b8054 100644 --- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java +++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java @@ -33,11 +33,16 @@ import org.apache.hadoop.fs.Path; public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> { - public AvroFileSource(Path path, AvroType<T> ptype) { - super(path, ptype, new InputBundle(AvroInputFormat.class) + private static <S> InputBundle getBundle(AvroType<S> ptype) { + InputBundle bundle = new InputBundle(AvroInputFormat.class) .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect())) .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString()) - .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName())); + .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName()); + return bundle; + } + + public AvroFileSource(Path path, AvroType<T> ptype) { + super(path, ptype, getBundle(ptype)); } @Override @@ -48,6 +53,6 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSour @Override public Iterable<T> read(Configuration conf) throws IOException { FileSystem fs = path.getFileSystem(conf); - return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>((AvroType<T>) ptype, conf)); + return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>((AvroType<T>) ptype)); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java b/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java index d58f290..3bd802e 100644 --- a/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java +++ b/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java @@ -28,7 +28,7 @@ import com.google.common.io.Closeables; * Closes the wrapped {@code Closeable} when {@link #hasNext()} returns false. As long a client loops through to * completion (doesn't abort early due to an exception, short circuit, etc.) resources will be closed automatically. */ -public class AutoClosingIterator<T> extends UnmodifiableIterator<T> implements Iterator<T>, Closeable { +public class AutoClosingIterator<T> extends UnmodifiableIterator<T> implements Closeable { private final Iterator<T> iter; private Closeable closeable; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java index f6e8f1d..c7ea767 100644 --- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java +++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java @@ -29,6 +29,10 @@ public class FileTableSourceImpl<K, V> extends FileSourceImpl<Pair<K, V>> implem super(path, tableType, formatClass); } + public FileTableSourceImpl(Path path, PTableType<K, V> tableType, InputBundle bundle) { + super(path, tableType, bundle); + } + @Override public PTableType<K, V> getTableType() { return (PTableType<K, V>) getType(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java index aa5a00a..2f32746 100644 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java +++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java @@ -25,8 +25,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.crunch.MapFn; import org.apache.crunch.io.FileReaderFactory; import org.apache.crunch.io.impl.AutoClosingIterator; +import org.apache.crunch.types.Converter; +import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; @@ -40,23 +41,29 @@ class SeqFileReaderFactory<T> implements FileReaderFactory<T> { private static final Log LOG = LogFactory.getLog(SeqFileReaderFactory.class); + private final Converter converter; private final MapFn<Object, T> mapFn; private final Writable key; private final Writable value; - private final Configuration conf; - public SeqFileReaderFactory(PType<T> ptype, Configuration conf) { - this.mapFn = SeqFileHelper.getInputMapFn(ptype); - this.key = NullWritable.get(); - this.value = SeqFileHelper.newInstance(ptype, conf); - this.conf = conf; + public SeqFileReaderFactory(PType<T> ptype) { + this.converter = ptype.getConverter(); + this.mapFn = ptype.getInputMapFn(); + if (ptype instanceof PTableType) { + PTableType ptt = (PTableType) ptype; + this.key = SeqFileHelper.newInstance(ptt.getKeyType(), null); + this.value = SeqFileHelper.newInstance(ptt.getValueType(), null); + } else { + this.key = NullWritable.get(); + this.value = SeqFileHelper.newInstance(ptype, null); + } } @Override public Iterator<T> read(FileSystem fs, final Path path) { mapFn.initialize(); try { - final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); + final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, fs.getConf()); return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() { boolean nextChecked = false; boolean hasNext = false; @@ -82,7 +89,7 @@ class SeqFileReaderFactory<T> implements FileReaderFactory<T> { return null; } nextChecked = false; - return mapFn.map(value); + return mapFn.map(converter.convertInput(key, value)); } }); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java index e8f3dcf..8fac4ae 100644 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java +++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java @@ -37,7 +37,7 @@ public class SeqFileSource<T> extends FileSourceImpl<T> implements ReadableSourc @Override public Iterable<T> read(Configuration conf) throws IOException { FileSystem fs = path.getFileSystem(conf); - return CompositePathIterable.create(fs, path, new SeqFileReaderFactory<T>(ptype, conf)); + return CompositePathIterable.create(fs, path, new SeqFileReaderFactory<T>(ptype)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java deleted file mode 100644 index 67259fb..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.seq; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.crunch.MapFn; -import org.apache.crunch.Pair; -import org.apache.crunch.io.FileReaderFactory; -import org.apache.crunch.io.impl.AutoClosingIterator; -import org.apache.crunch.types.PTableType; -import org.apache.crunch.types.PType; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Writable; - -import com.google.common.collect.Iterators; -import com.google.common.collect.UnmodifiableIterator; - -class SeqFileTableReaderFactory<K, V> implements FileReaderFactory<Pair<K, V>> { - - private static final Log LOG = LogFactory.getLog(SeqFileTableReaderFactory.class); - - private final MapFn<Object, K> keyMapFn; - private final MapFn<Object, V> valueMapFn; - private final Writable key; - private final Writable value; - private final Configuration conf; - - public SeqFileTableReaderFactory(PTableType<K, V> tableType, Configuration conf) { - PType<K> keyType = tableType.getKeyType(); - PType<V> valueType = tableType.getValueType(); - this.keyMapFn = SeqFileHelper.getInputMapFn(keyType); - this.valueMapFn = SeqFileHelper.getInputMapFn(valueType); - this.key = SeqFileHelper.newInstance(keyType, conf); - this.value = SeqFileHelper.newInstance(valueType, conf); - this.conf = conf; - } - - @Override - public Iterator<Pair<K, V>> read(FileSystem fs, final Path path) { - keyMapFn.initialize(); - valueMapFn.initialize(); - try { - final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); - return new AutoClosingIterator<Pair<K, V>>(reader, new UnmodifiableIterator<Pair<K, V>>() { - boolean nextChecked = false; - boolean hasNext = false; - - @Override - public boolean hasNext() { - if (nextChecked == true) { - return hasNext; - } - try { - hasNext = reader.next(key, value); - nextChecked = true; - return hasNext; - } catch (IOException e) { - LOG.info("Error reading from path: " + path, e); - return false; - } - } - - @Override - public Pair<K, V> next() { - if (!nextChecked && !hasNext()) { - return null; - } - nextChecked = false; - return Pair.of(keyMapFn.map(key), valueMapFn.map(value)); - } - }); - } catch (IOException e) { - LOG.info("Could not read seqfile at path: " + path, e); - return Iterators.emptyIterator(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java index 56ed985..7a63272 100644 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java +++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java @@ -30,7 +30,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; /** - * + * A {@code TableSource} that uses {@code SequenceFileInputFormat} to read the input + * file. */ public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K, V> implements ReadableSource<Pair<K, V>> { @@ -45,7 +46,8 @@ public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K, V> implemen @Override public Iterable<Pair<K, V>> read(Configuration conf) throws IOException { FileSystem fs = path.getFileSystem(conf); - return CompositePathIterable.create(fs, path, new SeqFileTableReaderFactory<K, V>((PTableType<K, V>) ptype, conf)); + return CompositePathIterable.create(fs, path, + new SeqFileReaderFactory<Pair<K, V>>(getTableType())); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java b/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java new file mode 100644 index 0000000..9438014 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/io/text/LineParser.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.text; + +import java.util.Iterator; +import java.util.List; +import java.util.StringTokenizer; + +import org.apache.crunch.MapFn; +import org.apache.crunch.Pair; +import org.apache.crunch.fn.CompositeMapFn; +import org.apache.crunch.fn.IdentityFn; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; + +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; + +/** + * An abstraction for parsing the lines of a text file using a {@code PType<T>} to + * convert the lines of text into a given data type. + * + * @param <T> The type returned by the text parsing + */ +abstract class LineParser<T> { + + public static <S> LineParser<S> forType(PType<S> ptype) { + return new SimpleLineParser<S>(ptype); + } + + public static <K, V> LineParser<Pair<K, V>> forTableType(PTableType<K, V> ptt, String sep) { + return new KeyValueLineParser<K, V>(ptt, sep); + } + + private MapFn<String, T> mapFn; + + public void initialize() { + mapFn = getMapFn(); + mapFn.initialize(); + } + + public T parse(String line) { + return mapFn.map(line); + } + + protected abstract MapFn<String, T> getMapFn(); + + private static <T> MapFn<String, T> getMapFnForPType(PType<T> ptype) { + MapFn ret = null; + if (String.class.equals(ptype.getTypeClass())) { + ret = (MapFn) IdentityFn.getInstance(); + } else { + // Check for a composite MapFn for the PType. + // Note that this won't work for Avro-- need to solve that. + ret = ptype.getInputMapFn(); + if (ret instanceof CompositeMapFn) { + ret = ((CompositeMapFn) ret).getSecond(); + } + } + return ret; + } + + private static class SimpleLineParser<S> extends LineParser<S> { + + private final PType<S> ptype; + + public SimpleLineParser(PType<S> ptype) { + this.ptype = ptype; + } + + @Override + protected MapFn<String, S> getMapFn() { + return getMapFnForPType(ptype); + } + } + + private static class KeyValueLineParser<K, V> extends LineParser<Pair<K, V>> { + + private final PTableType<K, V> ptt; + private final String sep; + + public KeyValueLineParser(PTableType<K, V> ptt, String sep) { + this.ptt = ptt; + this.sep = sep; + } + + @Override + protected MapFn<String, Pair<K, V>> getMapFn() { + final MapFn<String, K> keyMapFn = getMapFnForPType(ptt.getKeyType()); + final MapFn<String, V> valueMapFn = getMapFnForPType(ptt.getValueType()); + + return new MapFn<String, Pair<K, V>>() { + @Override + public void initialize() { + keyMapFn.initialize(); + valueMapFn.initialize(); + } + + @Override + public Pair<K, V> map(String input) { + List<String> kv = ImmutableList.copyOf(Splitter.on(sep).limit(1).split(input)); + if (kv.size() != 2) { + throw new RuntimeException("Invalid input string: " + input); + } + return Pair.of(keyMapFn.map(kv.get(0)), valueMapFn.map(kv.get(1))); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java b/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java new file mode 100644 index 0000000..d88ef4a --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.text; + +import java.io.IOException; + +import org.apache.crunch.io.CompositePathIterable; +import org.apache.crunch.io.ReadableSource; +import org.apache.crunch.io.impl.FileSourceImpl; +import org.apache.crunch.io.impl.InputBundle; +import org.apache.crunch.types.PType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; + +/** + * A {@code Source} instance that uses the {@code NLineInputFormat}, which gives each map + * task a fraction of the lines in a text file as input. Most useful when running simulations + * on Hadoop, where each line represents configuration information about each simulation + * run. + */ +public class NLineFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> { + + private static InputBundle getBundle(int linesPerTask) { + InputBundle bundle = new InputBundle(NLineInputFormat.class); + bundle.set(NLineInputFormat.LINES_PER_MAP, String.valueOf(linesPerTask)); + return bundle; + } + + /** + * Create a new {@code NLineFileSource} instance. + * + * @param path The path to the input data, as a String + * @param ptype The PType to use for processing the data + * @param linesPerTask The number of lines from the input each map task will process + */ + public NLineFileSource(String path, PType<T> ptype, int linesPerTask) { + this(new Path(path), ptype, linesPerTask); + } + + /** + * Create a new {@code NLineFileSource} instance. + * + * @param path The {@code Path} to the input data + * @param ptype The PType to use for processing the data + * @param linesPerTask The number of lines from the input each map task will process + */ + public NLineFileSource(Path path, PType<T> ptype, int linesPerTask) { + super(path, ptype, getBundle(linesPerTask)); + } + + @Override + public String toString() { + return "NLine(" + path + ")"; + } + + @Override + public Iterable<T> read(Configuration conf) throws IOException { + return CompositePathIterable.create(path.getFileSystem(conf), path, + new TextFileReaderFactory<T>(LineParser.forType(ptype))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java index a0c48e0..e1fea6e 100644 --- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java +++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java @@ -24,13 +24,9 @@ import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.crunch.MapFn; -import org.apache.crunch.fn.CompositeMapFn; -import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.io.FileReaderFactory; import org.apache.crunch.io.impl.AutoClosingIterator; import org.apache.crunch.types.PType; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -42,28 +38,19 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> { private static final Log LOG = LogFactory.getLog(TextFileReaderFactory.class); - private final PType<T> ptype; - private final Configuration conf; + private final LineParser<T> parser; - public TextFileReaderFactory(PType<T> ptype, Configuration conf) { - this.ptype = ptype; - this.conf = conf; + public TextFileReaderFactory(PType<T> ptype) { + this(LineParser.forType(ptype)); + } + + public TextFileReaderFactory(LineParser<T> parser) { + this.parser = parser; } @Override public Iterator<T> read(FileSystem fs, Path path) { - MapFn mapFn = null; - if (String.class.equals(ptype.getTypeClass())) { - mapFn = IdentityFn.getInstance(); - } else { - // Check for a composite MapFn for the PType. - // Note that this won't work for Avro-- need to solve that. - MapFn input = ptype.getInputMapFn(); - if (input instanceof CompositeMapFn) { - mapFn = ((CompositeMapFn) input).getSecond(); - } - } - mapFn.initialize(); + parser.initialize(); FSDataInputStream is; try { @@ -74,7 +61,6 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> { } final BufferedReader reader = new BufferedReader(new InputStreamReader(is)); - final MapFn<String, T> iterMapFn = mapFn; return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() { private String nextLine; @@ -90,7 +76,7 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> { @Override public T next() { - return iterMapFn.map(nextLine); + return parser.parse(nextLine); } }); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java index ee51c04..026fca9 100644 --- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java +++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java @@ -67,7 +67,7 @@ public class TextFileSource<T> extends FileSourceImpl<T> implements ReadableSour @Override public Iterable<T> read(Configuration conf) throws IOException { - return CompositePathIterable.create(path.getFileSystem(conf), path, new TextFileReaderFactory<T>(ptype, - conf)); + return CompositePathIterable.create(path.getFileSystem(conf), path, + new TextFileReaderFactory<T>(LineParser.forType(ptype))); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java new file mode 100644 index 0000000..c94676a --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.text; + +import java.io.IOException; + +import org.apache.crunch.Pair; +import org.apache.crunch.io.CompositePathIterable; +import org.apache.crunch.io.ReadableSource; +import org.apache.crunch.io.impl.FileTableSourceImpl; +import org.apache.crunch.io.impl.InputBundle; +import org.apache.crunch.types.PTableType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader; +import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; + +/** + * A {@code Source} that uses the {@code KeyValueTextInputFormat} to process + * input text. If a separator for the keys and values in the text file is not specified, + * a tab character is used. + */ +public class TextFileTableSource<K, V> extends FileTableSourceImpl<K, V> + implements ReadableSource<Pair<K, V>> { + + private static InputBundle getBundle(String sep) { + InputBundle bundle = new InputBundle(KeyValueTextInputFormat.class); + bundle.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, sep); + return bundle; + } + + private final String separator; + + public TextFileTableSource(String path, PTableType<K, V> tableType) { + this(new Path(path), tableType); + } + + public TextFileTableSource(Path path, PTableType<K, V> tableType) { + this(path, tableType, "\t"); + } + + public TextFileTableSource(String path, PTableType<K, V> tableType, String separator) { + this(new Path(path), tableType, separator); + } + + public TextFileTableSource(Path path, PTableType<K, V> tableType, String separator) { + super(path, tableType, getBundle(separator)); + this.separator = separator; + } + + @Override + public String toString() { + return "KeyValueText(" + path + ")"; + } + + @Override + public Iterable<Pair<K, V>> read(Configuration conf) throws IOException { + return CompositePathIterable.create(path.getFileSystem(conf), path, + new TextFileReaderFactory<Pair<K, V>>(LineParser.forTableType(getTableType(), separator))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java new file mode 100644 index 0000000..bdc83a1 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.text; + +import org.apache.crunch.Pair; +import org.apache.crunch.TableSource; +import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.SequentialFileNamingScheme; +import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl; +import org.apache.crunch.types.PTableType; +import org.apache.hadoop.fs.Path; + +/** + * A {@code TableSource} and {@code SourceTarget} implementation that uses the + * {@code KeyValueTextInputFormat} and {@code TextOutputFormat} to support reading + * and writing text files as {@code PTable} instances using a tab separator for + * the keys and the values. + */ +public class TextFileTableSourceTarget<K, V> extends ReadableSourcePathTargetImpl<Pair<K, V>> implements + TableSource<K, V> { + + private final PTableType<K, V> tableType; + + public TextFileTableSourceTarget(String path, PTableType<K, V> tableType) { + this(new Path(path), tableType); + } + + public TextFileTableSourceTarget(Path path, PTableType<K, V> tableType) { + this(path, tableType, new SequentialFileNamingScheme()); + } + + public TextFileTableSourceTarget(Path path, PTableType<K, V> tableType, + FileNamingScheme fileNamingScheme) { + super(new TextFileTableSource<K, V>(path, tableType), new TextFileTarget(path), + fileNamingScheme); + this.tableType = tableType; + } + + @Override + public PTableType<K, V> getTableType() { + return tableType; + } + + @Override + public String toString() { + return target.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java index c7e06d3..ec7d521 100644 --- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java +++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java @@ -73,7 +73,7 @@ public class TextFileTarget extends FileTargetImpl { @Override public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) { if (ptype instanceof PTableType) { - return null; + return new TextFileTableSourceTarget(path, (PTableType) ptype); } return new TextFileSourceTarget<T>(path, ptype); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/63050d0d/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java index 66863ba..62085f8 100644 --- a/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java +++ b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java @@ -83,7 +83,7 @@ public class AvroFileReaderFactoryTest { } private <T> AvroFileReaderFactory<T> createFileReaderFactory(AvroType<T> avroType) { - return new AvroFileReaderFactory<T>(avroType, new Configuration()); + return new AvroFileReaderFactory<T>(avroType); } @Test
