Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 dd78f270e -> 15cbf4617
CRUNCH-227: Added tests to prove that ToolRunner does not affect writing output to Text or Sequence File Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/15cbf461 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/15cbf461 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/15cbf461 Branch: refs/heads/apache-crunch-0.8 Commit: 15cbf46177ce1484b5bc9f4d9d0157965918f45f Parents: dd78f27 Author: Micah Whitacre <[email protected]> Authored: Tue Dec 30 15:54:25 2014 -0600 Committer: Micah Whitacre <[email protected]> Committed: Fri Jan 2 10:25:28 2015 -0600 ---------------------------------------------------------------------- crunch-core/pom.xml | 5 + .../java/org/apache/crunch/io/ToolRunnerIT.java | 212 +++++++++++++++++++ 2 files changed, 217 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/15cbf461/crunch-core/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-core/pom.xml b/crunch-core/pom.xml index f321c1c..f97b105 100644 --- a/crunch-core/pom.xml +++ b/crunch-core/pom.xml @@ -172,6 +172,11 @@ under the License. <artifactId>hamcrest-all</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <scope>test</scope> + </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/crunch/blob/15cbf461/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java b/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java new file mode 100644 index 0000000..287ba93 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io; + +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.seq.SeqFileTarget; +import org.apache.crunch.io.text.TextFileTableSource; +import org.apache.crunch.io.text.TextFileTarget; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + + +import static org.apache.crunch.types.writable.Writables.strings; +import static org.apache.crunch.types.writable.Writables.tableOf; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public class ToolRunnerIT { + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Rule + public TestName testName = new TestName(); + + @Test + public void textRunWithToolRunner() throws Exception { + Configuration config = tmpDir.getDefaultConfiguration(); + + String output = tmpDir.getFileName(testName.getMethodName()); + + assertThat(ToolRunner.run(config, new FakeTextTool(), new String[]{tmpDir.copyResourceFileName("urls.txt"), output}), is(0)); + + FileSystem fs = FileSystem.get(config); + + FileStatus fileStatus = fs.getFileStatus(new Path(output)); + + assertTrue(fileStatus.isDir()); + + } + + @Test + public void textRunWithoutToolRunner() throws Exception { + Configuration config = tmpDir.getDefaultConfiguration(); + + String output = tmpDir.getFileName(testName.getMethodName()); + + FakeTextTool tool = new FakeTextTool(); + tool.setConf(config); + + assertThat(tool.run(new String[]{tmpDir.copyResourceFileName("urls.txt"), output}), is(0)); + + FileSystem fs = FileSystem.get(config); + + FileStatus fileStatus = fs.getFileStatus(new Path(output)); + + assertTrue(fileStatus.isDir()); + } + + @Test + public void sequenceRunWithToolRunner() throws Exception { + Configuration config = tmpDir.getDefaultConfiguration(); + + String output = tmpDir.getFileName(testName.getMethodName()); + + assertThat(ToolRunner.run(config, new FakeSequenceTool(), new String[]{tmpDir.copyResourceFileName("urls.txt"), output}), is(0)); + + FileSystem fs = FileSystem.get(config); + + FileStatus fileStatus = fs.getFileStatus(new Path(output)); + + assertTrue(fileStatus.isDir()); + + } + + + @Test + public void sequenceRunWithoutToolRunner() throws Exception { + Configuration config = tmpDir.getDefaultConfiguration(); + + String output = tmpDir.getFileName(testName.getMethodName()); + + FakeSequenceTool tool = new FakeSequenceTool(); + tool.setConf(config); + + assertThat(tool.run(new String[]{tmpDir.copyResourceFileName("urls.txt"), output}), is(0)); + + FileSystem fs = FileSystem.get(config); + + FileStatus fileStatus = fs.getFileStatus(new Path(output)); + + assertTrue(fileStatus.isDir()); + } + + + private static class FakeTextTool implements Tool, Configurable { + + private Configuration config; + + @Override + public int run(String[] strings) throws Exception { + String urlsFile = strings[0]; + String outFile = strings[1]; + Pipeline pipeline = new MRPipeline(ToolRunnerIT.class, getConf()); + + PCollection<String> urls = pipeline.read( + new TextFileTableSource<String, String>(urlsFile, tableOf(strings(), strings()))).values(); + + pipeline.write(urls, new TextFileTarget(outFile)); + + pipeline.done(); + + PCollection<String> stringPCollection = pipeline.readTextFile(outFile); + + assertThat(stringPCollection.length().getValue(), is(greaterThan(0L))); + + return 0; + } + + @Override + public void setConf(Configuration entries) { + config = entries; + } + + @Override + public Configuration getConf() { + return config; + } + } + + private static class FakeSequenceTool implements Tool, Configurable { + + private Configuration config; + + @Override + public int run(String[] strings) throws Exception { + String urlsFile = strings[0]; + String outFile = strings[1]; + Pipeline pipeline = new MRPipeline(ToolRunnerIT.class, getConf()); + + PCollection<String> urls = pipeline.read( + new TextFileTableSource<String, String>(urlsFile, tableOf(strings(), strings()))).values(); + + PType<BytesWritable> bwType = Writables.writables(BytesWritable.class); + urls.parallelDo(new ByteConvertFn(), Writables.pairs(bwType, bwType)); + + pipeline.write(urls, new SeqFileTarget(outFile)); + + pipeline.done(); + + PCollection<Pair<BytesWritable, BytesWritable>> stringPCollection = + pipeline.read(From.sequenceFile(outFile, BytesWritable.class, BytesWritable.class)); + + assertThat(stringPCollection.length().getValue(), is(greaterThan(0L))); + + return 0; + } + + @Override + public void setConf(Configuration entries) { + config = entries; + } + + @Override + public Configuration getConf() { + return config; + } + } + + public static class ByteConvertFn extends MapFn<String, Pair<BytesWritable, BytesWritable>> { + + @Override + public Pair<BytesWritable, BytesWritable> map(String input) { + BytesWritable bw = new BytesWritable(input.getBytes()); + return Pair.of(bw, bw); + } + } +}
