Repository: crunch Updated Branches: refs/heads/master c9be5e87d -> 9f4193163
CRUNCH-517: Make FileSourceImpl implement ReadableSource. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9f419316 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9f419316 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9f419316 Branch: refs/heads/master Commit: 9f4193163d739cf1da4049f5793455d41e32b888 Parents: c9be5e8 Author: Josh Wills <[email protected]> Authored: Tue May 5 09:45:08 2015 +0200 Committer: Josh Wills <[email protected]> Committed: Fri May 8 22:38:43 2015 +0100 ---------------------------------------------------------------------- .../org/apache/crunch/io/FormattedFileIT.java | 66 ++++++++++ .../io/impl/DefaultFileReaderFactory.java | 131 +++++++++++++++++++ .../apache/crunch/io/impl/FileReadableData.java | 42 ++++++ .../apache/crunch/io/impl/FileSourceImpl.java | 14 +- 4 files changed, 252 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/9f419316/crunch-core/src/it/java/org/apache/crunch/io/FormattedFileIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/FormattedFileIT.java b/crunch-core/src/it/java/org/apache/crunch/io/FormattedFileIT.java new file mode 100644 index 0000000..2b15eac --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/io/FormattedFileIT.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.junit.Rule; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class FormattedFileIT { + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testReadFormattedFile() throws Exception { + String urlsFile = tmpDir.copyResourceFileName("urls.txt"); + Pipeline p = new MRPipeline(FormattedFileIT.class, tmpDir.getDefaultConfiguration()); + PTable<LongWritable, Text> urls = p.read(From.formattedFile(urlsFile, + TextInputFormat.class, LongWritable.class, Text.class)); + List<String> expect = ImmutableList.of("A", "A", "A", "B", "B", "C", "D", "E", "F", "F", ""); + List<String> actual = Lists.newArrayList(Iterables.transform(urls.materialize(), + new Function<Pair<LongWritable, Text>, String>() { + @Override + public String apply(Pair<LongWritable, Text> pair) { + String str = pair.second().toString(); + if (str.isEmpty()) { + return str; + } + return str.substring(4, 5); + } + })); + assertEquals(expect, actual); + p.done(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/9f419316/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java new file mode 100644 index 0000000..90c15fa --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.impl; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.UnmodifiableIterator; +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory; +import org.apache.crunch.io.FileReaderFactory; +import org.apache.crunch.io.FormatBundle; +import org.apache.crunch.types.PType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; + +class DefaultFileReaderFactory<T> implements FileReaderFactory<T> { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultFileReaderFactory.class); + + private final FormatBundle<? extends InputFormat> bundle; + private final PType<T> ptype; + + public DefaultFileReaderFactory(FormatBundle<? extends InputFormat> bundle, PType<T> ptype) { + this.bundle = bundle; + this.ptype = ptype; + } + + @Override + public Iterator<T> read(FileSystem fs, Path path) { + final Configuration conf = new Configuration(fs.getConf()); + bundle.configure(conf); + ptype.initialize(conf); + + final InputFormat fmt = ReflectionUtils.newInstance(bundle.getFormatClass(), conf); + final TaskAttemptContext ctxt = TaskAttemptContextFactory.create(conf, new TaskAttemptID()); + try { + Job job = new Job(conf); + FileInputFormat.addInputPath(job, path); + return Iterators.concat(Lists.transform(fmt.getSplits(job), new Function<InputSplit, Iterator<T>>() { + @Override + public Iterator<T> apply(InputSplit split) { + try { + RecordReader reader = fmt.createRecordReader(split, ctxt); + reader.initialize(split, ctxt); + return new RecordReaderIterator<T>(reader, ptype); + } catch (Exception e) { + LOG.error("Error reading split: " + split, e); + throw new CrunchRuntimeException(e); + } + } + }).iterator()); + } catch (Exception e) { + LOG.error("Error reading path: " + path, e); + throw new CrunchRuntimeException(e); + } + } + + private static class RecordReaderIterator<T> extends UnmodifiableIterator<T> { + + private final RecordReader reader; + private final PType<T> ptype; + private T cur; + private boolean hasNext; + + public RecordReaderIterator(RecordReader reader, PType<T> ptype) { + this.reader = reader; + this.ptype = ptype; + try { + this.hasNext = reader.nextKeyValue(); + if (hasNext) { + Object converted = ptype.getConverter().convertInput( + reader.getCurrentKey(), reader.getCurrentValue()); + this.cur = ptype.getInputMapFn().map(converted); + } + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + + @Override + public boolean hasNext() { + return hasNext; + } + + @Override + public T next() { + T ret = cur; + try { + hasNext = reader.nextKeyValue(); + if (hasNext) { + Object converted = ptype.getConverter().convertInput( + reader.getCurrentKey(), reader.getCurrentValue()); + this.cur = ptype.getInputMapFn().map(converted); + } + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + return ret; + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/9f419316/crunch-core/src/main/java/org/apache/crunch/io/impl/FileReadableData.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileReadableData.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileReadableData.java new file mode 100644 index 0000000..9d9bb93 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileReadableData.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.impl; + +import org.apache.crunch.io.FileReaderFactory; +import org.apache.crunch.io.FormatBundle; +import org.apache.crunch.types.PType; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; + +import java.util.List; + +class FileReadableData<T> extends ReadableDataImpl<T> { + + private final FormatBundle<? extends InputFormat> bundle; + private final PType<T> ptype; + + public FileReadableData(List<Path> paths, FormatBundle<? extends InputFormat> bundle, PType<T> ptype) { + super(paths); + this.bundle = bundle; + this.ptype = ptype; + } + @Override + protected FileReaderFactory<T> getFileReaderFactory() { + return new DefaultFileReaderFactory<T>(bundle, ptype); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/9f419316/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java index b42d815..27a1167 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java @@ -24,12 +24,14 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.crunch.ReadableData; import org.apache.crunch.Source; import org.apache.crunch.impl.mr.run.CrunchInputFormat; import org.apache.crunch.io.CompositePathIterable; import org.apache.crunch.io.CrunchInputs; import org.apache.crunch.io.FileReaderFactory; import org.apache.crunch.io.FormatBundle; +import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.SourceTargetHelper; import org.apache.crunch.types.Converter; import org.apache.crunch.types.PType; @@ -41,7 +43,7 @@ import org.apache.hadoop.mapreduce.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FileSourceImpl<T> implements Source<T> { +public class FileSourceImpl<T> implements ReadableSource<T> { private static final Logger LOG = LoggerFactory.getLogger(FileSourceImpl.class); @@ -177,4 +179,14 @@ public class FileSourceImpl<T> implements Source<T> { public String toString() { return new StringBuilder().append(inputBundle.getName()).append("(").append(pathsAsString()).append(")").toString(); } + + @Override + public Iterable<T> read(Configuration conf) throws IOException { + return read(conf, new DefaultFileReaderFactory<T>(inputBundle, ptype)); + } + + @Override + public ReadableData<T> asReadable() { + return new FileReadableData<T>(paths, inputBundle, ptype); + } }
