Updated Branches: refs/heads/master f1e877d4f -> 48cf308c8
CRUNCH-200: Refactor the logic for handling output file relocation out of CrunchJobHooks and into PathTarget implementations Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/48cf308c Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/48cf308c Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/48cf308c Branch: refs/heads/master Commit: 48cf308c82ad2e3adb95083a0d10c90e70503fd5 Parents: f1e877d Author: Josh Wills <[email protected]> Authored: Sun Apr 28 23:27:44 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Tue Apr 30 18:40:03 2013 -0700 ---------------------------------------------------------------------- .../apache/crunch/impl/mr/exec/CrunchJobHooks.java | 63 +------------- .../main/java/org/apache/crunch/io/PathTarget.java | 15 +++ .../java/org/apache/crunch/io/PathTargetImpl.java | 64 -------------- .../org/apache/crunch/io/impl/FileTargetImpl.java | 67 ++++++++++++++- .../crunch/io/impl/SourcePathTargetImpl.java | 9 ++ .../crunch/impl/mr/exec/CrunchJobHooksTest.java | 42 --------- .../apache/crunch/io/impl/CrunchJobHooksTest.java | 43 +++++++++ 7 files changed, 134 insertions(+), 169 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java index 74bc9ac..b06847b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java @@ -19,17 +19,12 @@ package org.apache.crunch.impl.mr.exec; import java.io.IOException; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob; -import org.apache.crunch.impl.mr.plan.PlanningParameters; import org.apache.crunch.impl.mr.run.RuntimeParameters; -import org.apache.crunch.io.FileNamingScheme; import org.apache.crunch.io.PathTarget; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -89,65 +84,9 @@ public final class CrunchJobHooks { // job to the output locations specified in the paths. FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration()); for (Map.Entry<Integer, PathTarget> entry : multiPaths.entrySet()) { - final int i = entry.getKey(); - final Path dst = entry.getValue().getPath(); - FileNamingScheme fileNamingScheme = entry.getValue().getFileNamingScheme(); - - Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*"); - Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src); - Configuration conf = job.getConfiguration(); - FileSystem dstFs = dst.getFileSystem(conf); - if (!dstFs.exists(dst)) { - dstFs.mkdirs(dst); - } - boolean sameFs = isCompatible(srcFs, dst); - for (Path s : srcs) { - Path d = getDestFile(conf, s, dst, fileNamingScheme); - if (sameFs) { - srcFs.rename(s, d); - } else { - FileUtil.copy(srcFs, s, dstFs, d, true, true, job.getConfiguration()); - } - } + entry.getValue().handleOutputs(job.getConfiguration(), workingPath, entry.getKey(), mapOnlyJob); } } } - - private boolean isCompatible(FileSystem fs, Path path) { - try { - fs.makeQualified(path); - return true; - } catch (IllegalArgumentException e) { - return false; - } - } - private Path getDestFile(Configuration conf, Path src, Path dir, FileNamingScheme fileNamingScheme) - throws IOException { - String outputFilename = null; - if (mapOnlyJob) { - outputFilename = fileNamingScheme.getMapOutputName(conf, dir); - } else { - outputFilename = fileNamingScheme.getReduceOutputName(conf, dir, extractPartitionNumber(src.getName())); - } - if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) { - outputFilename += org.apache.avro.mapred.AvroOutputFormat.EXT; - } - return new Path(dir, outputFilename); - } - } - - /** - * Extract the partition number from a raw reducer output filename. - * - * @param reduceOutputFileName The raw reducer output file name - * @return The partition number encoded in the filename - */ - static int extractPartitionNumber(String reduceOutputFileName) { - Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName); - if (matcher.find()) { - return Integer.parseInt(matcher.group(1), 10); - } else { - throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName + "' cannot be parsed"); - } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java index 7a35209..4f7949f 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java @@ -17,6 +17,9 @@ */ package org.apache.crunch.io; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; /** @@ -33,4 +36,16 @@ public interface PathTarget extends MapReduceTarget { * @return the naming scheme to be used */ FileNamingScheme getFileNamingScheme(); + + /** + * Handles moving the output data for this target from a temporary location on the + * filesystem to its target path at the end of a MapReduce job. + * + * @param conf The job {@code Configuration} + * @param workingPath The temp directory that contains the output of the job + * @param index The index of this target for jobs that write multiple output files to a single directory + * @param mapOnlyJob Whether or not this is a map-only job + * @throws IOException + */ + void handleOutputs(Configuration conf, Path workingPath, int index, boolean mapOnlyJob) throws IOException; } http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java deleted file mode 100644 index 0be3f9a..0000000 --- a/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io; - -import org.apache.crunch.types.PType; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; - -public abstract class PathTargetImpl implements PathTarget { - - private final Path path; - private final Class<OutputFormat> outputFormatClass; - private final Class keyClass; - private final Class valueClass; - - public PathTargetImpl(String path, Class<OutputFormat> outputFormatClass, Class keyClass, Class valueClass) { - this(new Path(path), outputFormatClass, keyClass, valueClass); - } - - public PathTargetImpl(Path path, Class<OutputFormat> outputFormatClass, Class keyClass, Class valueClass) { - this.path = path; - this.outputFormatClass = outputFormatClass; - this.keyClass = keyClass; - this.valueClass = valueClass; - } - - @Override - public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) { - try { - FileOutputFormat.setOutputPath(job, path); - } catch (Exception e) { - throw new RuntimeException(e); - } - if (name == null) { - job.setOutputFormatClass(outputFormatClass); - job.setOutputKeyClass(keyClass); - job.setOutputValueClass(valueClass); - } else { - CrunchOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass); - } - } - - @Override - public Path getPath() { - return path; - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java index c1c29e4..5ceb3be 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java @@ -18,12 +18,15 @@ package org.apache.crunch.io.impl; import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.SourceTarget; +import org.apache.crunch.impl.mr.plan.PlanningParameters; import org.apache.crunch.io.CrunchOutputs; import org.apache.crunch.io.FileNamingScheme; import org.apache.crunch.io.OutputHandler; @@ -31,8 +34,8 @@ import org.apache.crunch.io.PathTarget; import org.apache.crunch.types.Converter; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @@ -82,11 +85,73 @@ public class FileTargetImpl implements PathTarget { return true; } + public void handleOutputs(Configuration conf, Path workingPath, int index, + boolean mapOnlyJob) throws IOException { + FileSystem srcFs = workingPath.getFileSystem(conf); + Path src = getSourcePattern(workingPath, index); + Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src); + FileSystem dstFs = FileSystem.get(conf); + if (!dstFs.exists(path)) { + dstFs.mkdirs(path); + } + boolean sameFs = isCompatible(srcFs, path); + for (Path s : srcs) { + Path d = getDestFile(conf, s, path, mapOnlyJob); + if (sameFs) { + srcFs.rename(s, d); + } else { + FileUtil.copy(srcFs, s, dstFs, d, true, true, conf); + } + } + } + + protected Path getSourcePattern(Path workingPath, int index) { + return new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index + "-*"); + } + @Override public Path getPath() { return path; } + + protected static boolean isCompatible(FileSystem fs, Path path) { + try { + fs.makeQualified(path); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } + protected Path getDestFile(Configuration conf, Path src, Path dir, boolean mapOnlyJob) + throws IOException { + String outputFilename = null; + if (mapOnlyJob) { + outputFilename = getFileNamingScheme().getMapOutputName(conf, dir); + } else { + outputFilename = getFileNamingScheme().getReduceOutputName(conf, dir, extractPartitionNumber(src.getName())); + } + if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) { + outputFilename += org.apache.avro.mapred.AvroOutputFormat.EXT; + } + return new Path(dir, outputFilename); + } + + /** + * Extract the partition number from a raw reducer output filename. + * + * @param reduceOutputFileName The raw reducer output file name + * @return The partition number encoded in the filename + */ + public static int extractPartitionNumber(String reduceOutputFileName) { + Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName); + if (matcher.find()) { + return Integer.parseInt(matcher.group(1), 10); + } else { + throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName + "' cannot be parsed"); + } + } + @Override public FileNamingScheme getFileNamingScheme() { return fileNamingScheme; http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java index c0d7ce0..a90bb7b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java @@ -17,10 +17,13 @@ */ package org.apache.crunch.io.impl; +import java.io.IOException; + import org.apache.crunch.Source; import org.apache.crunch.io.FileNamingScheme; import org.apache.crunch.io.PathTarget; import org.apache.crunch.types.PType; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; @@ -47,4 +50,10 @@ public class SourcePathTargetImpl<T> extends SourceTargetImpl<T> implements Path public FileNamingScheme getFileNamingScheme() { return fileNamingScheme; } + + @Override + public void handleOutputs(Configuration conf, Path workingPath, int index, boolean mapOnlyJob) + throws IOException { + ((PathTarget) target).handleOutputs(conf, workingPath, index, mapOnlyJob); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java deleted file mode 100644 index f03c3e2..0000000 --- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.exec; - -import static org.junit.Assert.assertEquals; - -import org.junit.Test; - -public class CrunchJobHooksTest { - - @Test - public void testExtractPartitionNumber() { - assertEquals(0, CrunchJobHooks.extractPartitionNumber("out1-r-00000")); - assertEquals(10, CrunchJobHooks.extractPartitionNumber("out2-r-00010")); - assertEquals(99999, CrunchJobHooks.extractPartitionNumber("out3-r-99999")); - } - - @Test - public void testExtractPartitionNumber_WithSuffix() { - assertEquals(10, CrunchJobHooks.extractPartitionNumber("out2-r-00010.avro")); - } - - @Test(expected = IllegalArgumentException.class) - public void testExtractPartitionNumber_MapOutputFile() { - CrunchJobHooks.extractPartitionNumber("out1-m-00000"); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/test/java/org/apache/crunch/io/impl/CrunchJobHooksTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/io/impl/CrunchJobHooksTest.java b/crunch-core/src/test/java/org/apache/crunch/io/impl/CrunchJobHooksTest.java new file mode 100644 index 0000000..705ed10 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/io/impl/CrunchJobHooksTest.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.impl; + +import static org.junit.Assert.assertEquals; + +import org.apache.crunch.io.impl.FileTargetImpl; +import org.junit.Test; + +public class CrunchJobHooksTest { + + @Test + public void testExtractPartitionNumber() { + assertEquals(0, FileTargetImpl.extractPartitionNumber("out1-r-00000")); + assertEquals(10, FileTargetImpl.extractPartitionNumber("out2-r-00010")); + assertEquals(99999, FileTargetImpl.extractPartitionNumber("out3-r-99999")); + } + + @Test + public void testExtractPartitionNumber_WithSuffix() { + assertEquals(10, FileTargetImpl.extractPartitionNumber("out2-r-00010.avro")); + } + + @Test(expected = IllegalArgumentException.class) + public void testExtractPartitionNumber_MapOutputFile() { + FileTargetImpl.extractPartitionNumber("out1-m-00000"); + } +}
