Updated Branches: refs/heads/master 687894564 -> c51bcd63a
CRUNCH-224 Support SequenceFiles in MemPipeline Contributed by Dominique Dierickx Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/c51bcd63 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/c51bcd63 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/c51bcd63 Branch: refs/heads/master Commit: c51bcd63afffc6e5a9c8a171503bc487440b2d85 Parents: 6878945 Author: Gabriel Reid <[email protected]> Authored: Thu Jun 20 09:03:08 2013 +0200 Committer: Gabriel Reid <[email protected]> Committed: Thu Jun 20 09:03:08 2013 +0200 ---------------------------------------------------------------------- .../mem/MemPipelineFileReadingWritingIT.java | 170 +++++++++++++++++++ .../impl/mem/MemPipelineFileWritingIT.java | 58 ------- .../org/apache/crunch/impl/mem/MemPipeline.java | 102 ++++++++--- 3 files changed, 248 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/c51bcd63/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java new file mode 100644 index 0000000..2d66672 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileReadingWritingIT.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mem; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import org.apache.crunch.PCollection; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.Target; +import org.apache.crunch.impl.mem.collect.MemCollection; +import org.apache.crunch.impl.mem.collect.MemTable; +import org.apache.crunch.io.From; +import org.apache.crunch.io.To; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Reader; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.Text; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.io.Files; + +public class MemPipelineFileReadingWritingIT { + @Rule + public TemporaryPath baseTmpDir = TemporaryPaths.create(); + + private File inputFile; + private File outputFile; + + + private static final Collection<String> EXPECTED_COLLECTION = Lists.newArrayList("hello", "world"); + @SuppressWarnings("unchecked") + private static final Collection<Pair<Integer, String>> EXPECTED_TABLE = Lists.newArrayList( + Pair.of(1, "hello"), + Pair.of(2, "world")); + + + @Before + public void setUp() throws IOException { + inputFile = baseTmpDir.getFile("test-read.seq"); + outputFile = baseTmpDir.getFile("test-write.seq"); + } + + @Test + public void testMemPipelineFileWriter() throws Exception { + File tmpDir = baseTmpDir.getFile("mempipe"); + Pipeline p = MemPipeline.getInstance(); + PCollection<String> lines = MemPipeline.collectionOf("hello", "world"); + p.writeTextFile(lines, tmpDir.toString()); + p.done(); + assertTrue(tmpDir.exists()); + File[] files = tmpDir.listFiles(); + assertTrue(files != null && files.length > 0); + for (File f : files) { + if (!f.getName().startsWith(".")) { + List<String> txt = Files.readLines(f, Charsets.UTF_8); + assertEquals(ImmutableList.of("hello", "world"), txt); + } + } + } + + private void createTestSequenceFile(final File seqFile) throws IOException { + SequenceFile.Writer writer = null; + writer = new Writer(FileSystem.getLocal(baseTmpDir.getDefaultConfiguration()), + baseTmpDir.getDefaultConfiguration(), + new Path(seqFile.toString()), + IntWritable.class, Text.class); + writer.append(new IntWritable(1), new Text("hello")); + writer.append(new IntWritable(2), new Text("world")); + writer.close(); + } + + @Test + public void testMemPipelineReadSequenceFile() throws IOException { + // set up input + createTestSequenceFile(inputFile); + + // read from sequence file + final PCollection<Pair<Integer, String>> readCollection = MemPipeline.getInstance().read( + From.sequenceFile(inputFile.toString(), + Writables.tableOf( + Writables.ints(), + Writables.strings()))); + + // assert read same as written. + assertEquals(EXPECTED_TABLE, Lists.newArrayList(readCollection.materialize())); + } + + @Test + public void testMemPipelineWriteSequenceFile_PCollection() throws IOException { + // write + PCollection<String> collection = MemPipeline.typedCollectionOf(Writables.strings(), EXPECTED_COLLECTION); + final Target target = To.sequenceFile(outputFile.toString()); + MemPipeline.getInstance().write(collection, target); + + // read + final SequenceFile.Reader reader = new Reader(FileSystem.getLocal( + baseTmpDir.getDefaultConfiguration()), new Path(outputFile.toString()), + baseTmpDir.getDefaultConfiguration()); + final List<String> actual = Lists.newArrayList(); + final NullWritable key = NullWritable.get(); + final Text value = new Text(); + while (reader.next(key, value)) { + actual.add(value.toString()); + } + reader.close(); + + // assert read same as written + assertEquals(EXPECTED_COLLECTION, actual); + } + + @Test + public void testMemPipelineWriteSequenceFile_PTable() throws IOException { + // write + final MemTable<Integer, String> collection = new MemTable<Integer, String>(EXPECTED_TABLE, // + Writables.tableOf( + Writables.ints(), + Writables.strings()), "test input"); + final Target target = To.sequenceFile(outputFile.toString()); + MemPipeline.getInstance().write(collection, target); + + // read + final SequenceFile.Reader reader = new Reader(FileSystem.getLocal(baseTmpDir + .getDefaultConfiguration()), new Path(outputFile.toString()), + baseTmpDir.getDefaultConfiguration()); + final List<Pair<Integer, String>> actual = Lists.newArrayList(); + final IntWritable key = new IntWritable(); + final Text value = new Text(); + while (reader.next(key, value)) { + actual.add(Pair.of(key.get(), value.toString())); + } + reader.close(); + + // assert read same as written + assertEquals(EXPECTED_TABLE, actual); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/c51bcd63/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java deleted file mode 100644 index 976a43e..0000000 --- a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mem; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.util.List; - -import org.apache.crunch.PCollection; -import org.apache.crunch.Pipeline; -import org.apache.crunch.test.TemporaryPath; -import org.apache.crunch.test.TemporaryPaths; -import org.junit.Rule; -import org.junit.Test; - -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableList; -import com.google.common.io.Files; - -public class MemPipelineFileWritingIT { - @Rule - public TemporaryPath baseTmpDir = TemporaryPaths.create(); - - @Test - public void testMemPipelineFileWriter() throws Exception { - File tmpDir = baseTmpDir.getFile("mempipe"); - Pipeline p = MemPipeline.getInstance(); - PCollection<String> lines = MemPipeline.collectionOf("hello", "world"); - p.writeTextFile(lines, tmpDir.toString()); - p.done(); - assertTrue(tmpDir.exists()); - File[] files = tmpDir.listFiles(); - assertTrue(files != null && files.length > 0); - for (File f : files) { - if (!f.getName().startsWith(".")) { - List<String> txt = Files.readLines(f, Charsets.UTF_8); - assertEquals(ImmutableList.of("hello", "world"), txt); - } - } - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/c51bcd63/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index e2a2529..9001e51 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -17,9 +17,11 @@ */ package org.apache.crunch.impl.mem; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericContainer; @@ -42,6 +44,8 @@ import org.apache.crunch.io.At; import org.apache.crunch.io.PathTarget; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.avro.AvroFileTarget; +import org.apache.crunch.io.seq.SeqFileTarget; +import org.apache.crunch.types.Converter; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.avro.ReflectDataFactory; @@ -49,12 +53,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapreduce.Counters; -import java.io.IOException; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; public class MemPipeline implements Pipeline { @@ -170,36 +175,48 @@ public class MemPipeline implements Pipeline { } @Override - public void write(PCollection<?> collection, Target target, - Target.WriteMode writeMode) { + public void write(PCollection<?> collection, Target target, Target.WriteMode writeMode) { target.handleExisting(writeMode, getConfiguration()); if (writeMode != Target.WriteMode.APPEND && activeTargets.contains(target)) { - throw new CrunchRuntimeException("Target " + target + " is already written in the current run." + - " Use WriteMode.APPEND in order to write additional data to it."); + throw new CrunchRuntimeException("Target " + target + + " is already written in the current run." + + " Use WriteMode.APPEND in order to write additional data to it."); } activeTargets.add(target); if (target instanceof PathTarget) { Path path = ((PathTarget) target).getPath(); try { FileSystem fs = path.getFileSystem(conf); - FSDataOutputStream os = fs.create(new Path(path, "out" + outputIndex)); outputIndex++; - if (target instanceof AvroFileTarget) { - writeAvroFile(os, collection.materialize()); - } else if (collection instanceof PTable) { - for (Object o : collection.materialize()) { - Pair p = (Pair) o; - os.writeBytes(p.first().toString()); - os.writeBytes("\t"); - os.writeBytes(p.second().toString()); - os.writeBytes("\r\n"); + if (target instanceof SeqFileTarget) { + if (collection instanceof PTable) { + writeSequenceFileFromPTable(fs, path, (PTable<?, ?>) collection); + } else { + writeSequenceFileFromPCollection(fs, path, collection); } } else { - for (Object o : collection.materialize()) { - os.writeBytes(o.toString() + "\r\n"); + FSDataOutputStream os = fs.create(new Path(path, "out" + outputIndex)); + if (target instanceof AvroFileTarget && !(collection instanceof PTable)) { + + writeAvroFile(os, collection.materialize()); + } else { + LOG.warn("Defaulting to write to a text file from MemPipeline"); + if (collection instanceof PTable) { + for (Object o : collection.materialize()) { + Pair p = (Pair) o; + os.writeBytes(p.first().toString()); + os.writeBytes("\t"); + os.writeBytes(p.second().toString()); + os.writeBytes("\r\n"); + } + } else { + for (Object o : collection.materialize()) { + os.writeBytes(o.toString() + "\r\n"); + } + } } + os.close(); } - os.close(); } catch (IOException e) { LOG.error("Exception writing target: " + target, e); } @@ -233,7 +250,44 @@ public class MemPipeline implements Pipeline { dataFileWriter.close(); outputStream.close(); } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void writeSequenceFileFromPTable(final FileSystem fs, final Path path, final PTable table) + throws IOException { + final PTableType pType = table.getPTableType(); + final Class<?> keyClass = pType.getConverter().getKeyClass(); + final Class<?> valueClass = pType.getConverter().getValueClass(); + + final SequenceFile.Writer writer = new SequenceFile.Writer(fs, fs.getConf(), path, keyClass, + valueClass); + + for (final Object o : table.materialize()) { + final Pair<?,?> p = (Pair) o; + final Object key = pType.getKeyType().getOutputMapFn().map(p.first()); + final Object value = pType.getValueType().getOutputMapFn().map(p.second()); + writer.append(key, value); + } + writer.close(); + } + + private void writeSequenceFileFromPCollection(final FileSystem fs, final Path path, + final PCollection collection) throws IOException { + final PType pType = collection.getPType(); + final Converter converter = pType.getConverter(); + final Class valueClass = converter.getValueClass(); + + final SequenceFile.Writer writer = new SequenceFile.Writer(fs, fs.getConf(), path, + NullWritable.class, valueClass); + + for (final Object o : collection.materialize()) { + final Object value = pType.getOutputMapFn().map(o); + writer.append(NullWritable.get(), value); + } + + writer.close(); + } + @Override public PCollection<String> readTextFile(String pathName) { return read(At.textFile(pathName));
