Updated Branches: refs/heads/master 255800fb3 -> 3eb2d3f3b
CRUNCH-91 - Add FileNamingScheme for custom output file names Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/3eb2d3f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/3eb2d3f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/3eb2d3f3 Branch: refs/heads/master Commit: 3eb2d3f3bcf0d2952a8e9a13df8df2500d1e529b Parents: 255800f Author: Gabriel Reid <[email protected]> Authored: Sun Oct 7 22:58:54 2012 +0200 Committer: Gabriel Reid <[email protected]> Committed: Wed Oct 10 10:13:37 2012 +0200 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/MRPipelineIT.java | 1 - .../mapreduce/lib/jobcontrol/CrunchJobControl.java | 5 + .../org/apache/crunch/impl/mr/exec/CrunchJob.java | 54 +++++++--- .../crunch/impl/mr/plan/MSCROutputHandler.java | 6 +- .../org/apache/crunch/io/FileNamingScheme.java | 58 ++++++++++ .../main/java/org/apache/crunch/io/PathTarget.java | 12 ++ .../crunch/io/SequentialFileNamingScheme.java | 51 +++++++++ crunch/src/main/java/org/apache/crunch/io/To.java | 2 +- .../crunch/io/avro/AvroFileSourceTarget.java | 8 ++- .../org/apache/crunch/io/avro/AvroFileTarget.java | 9 ++- .../org/apache/crunch/io/impl/FileTargetImpl.java | 11 ++- .../io/impl/ReadableSourcePathTargetImpl.java | 5 +- .../crunch/io/impl/SourcePathTargetImpl.java | 11 ++- .../crunch/io/impl/TableSourcePathTargetImpl.java | 8 ++- .../apache/crunch/io/seq/SeqFileSourceTarget.java | 9 ++- .../crunch/io/seq/SeqFileTableSourceTarget.java | 8 ++- .../org/apache/crunch/io/seq/SeqFileTarget.java | 8 ++- .../crunch/io/text/TextFileSourceTarget.java | 9 ++- .../org/apache/crunch/io/text/TextFileTarget.java | 8 ++- .../apache/crunch/impl/mr/exec/CrunchJobTest.java | 42 +++++++ .../crunch/io/SequentialFileNamingSchemeTest.java | 84 +++++++++++++++ 21 files changed, 376 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java index 0865820..8664550 100644 --- a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java +++ b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.util.Arrays; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.mr.MRPipeline; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java index e2dd39f..80f701b 100644 --- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java +++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java @@ -24,6 +24,8 @@ import java.util.Hashtable; import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.State; import org.apache.hadoop.conf.Configuration; @@ -57,6 +59,8 @@ public class CrunchJobControl implements Runnable { private Map<String, CrunchControlledJob> successfulJobs; private Map<String, CrunchControlledJob> failedJobs; + private Log log = LogFactory.getLog(CrunchJobControl.class); + private long nextJobID; private String groupName; private int jobPollInterval; @@ -274,6 +278,7 @@ public class CrunchJobControl implements Runnable { checkWaitingJobs(); startReadyJobs(); } catch (Exception e) { + log.error("Error in run loop", e); this.runnerState = ThreadState.STOPPED; } if (this.runnerState != ThreadState.RUNNING http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java index 74c6ff3..b4981db 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java @@ -19,16 +19,21 @@ package org.apache.crunch.impl.mr.exec; import java.io.IOException; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob; import org.apache.crunch.impl.mr.plan.MSCROutputHandler; import org.apache.crunch.impl.mr.plan.PlanningParameters; +import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.PathTarget; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; -import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob; import org.apache.hadoop.util.StringUtils; import com.google.common.collect.Lists; @@ -38,7 +43,7 @@ public class CrunchJob extends CrunchControlledJob { private final Log log = LogFactory.getLog(CrunchJob.class); private final Path workingPath; - private final Map<Integer, Path> multiPaths; + private final Map<Integer, PathTarget> multiPaths; private final boolean mapOnlyJob; public CrunchJob(Job job, Path workingPath, MSCROutputHandler handler) throws IOException { @@ -53,20 +58,21 @@ public class CrunchJob extends CrunchControlledJob { // Need to handle moving the data from the output directory of the // job to the output locations specified in the paths. FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration()); - for (Map.Entry<Integer, Path> entry : multiPaths.entrySet()) { + for (Map.Entry<Integer, PathTarget> entry : multiPaths.entrySet()) { final int i = entry.getKey(); - final Path dst = entry.getValue(); + final Path dst = entry.getValue().getPath(); + FileNamingScheme fileNamingScheme = entry.getValue().getFileNamingScheme(); Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*"); Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src); - FileSystem dstFs = dst.getFileSystem(job.getConfiguration()); + Configuration conf = job.getConfiguration(); + FileSystem dstFs = dst.getFileSystem(conf); if (!dstFs.exists(dst)) { dstFs.mkdirs(dst); } boolean sameFs = isCompatible(srcFs, dst); - int minPartIndex = getMinPartIndex(dst, dstFs); for (Path s : srcs) { - Path d = getDestFile(s, dst, minPartIndex++); + Path d = getDestFile(conf, s, dst, fileNamingScheme); if (sameFs) { srcFs.rename(s, d); } else { @@ -86,17 +92,18 @@ public class CrunchJob extends CrunchControlledJob { } } - private Path getDestFile(Path src, Path dir, int index) { - String form = "part-%s-%05d"; + private Path getDestFile(Configuration conf, Path src, Path dir, FileNamingScheme fileNamingScheme) + throws IOException { + String outputFilename = null; + if (mapOnlyJob) { + outputFilename = fileNamingScheme.getMapOutputName(conf, dir); + } else { + outputFilename = fileNamingScheme.getReduceOutputName(conf, dir, CrunchJob.extractPartitionNumber(src.getName())); + } if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) { - form = form + org.apache.avro.mapred.AvroOutputFormat.EXT; + outputFilename += org.apache.avro.mapred.AvroOutputFormat.EXT; } - return new Path(dir, String.format(form, mapOnlyJob ? "m" : "r", index)); - } - - private int getMinPartIndex(Path path, FileSystem fs) throws IOException { - // Quick and dirty way to ensure unique naming in the directory - return fs.listStatus(path).length; + return new Path(dir, outputFilename); } @Override @@ -134,4 +141,19 @@ public class CrunchJob extends CrunchControlledJob { log.info(getMessage()); } } + + /** + * Extract the partition number from a raw reducer output filename. + * + * @param fileName The raw reducer output file name + * @return The partition number encoded in the filename + */ + static int extractPartitionNumber(String reduceOutputFileName) { + Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName); + if (matcher.find()) { + return Integer.parseInt(matcher.group(1), 10); + } else { + throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName + "' cannot be parsed"); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java index b6a41da..36c565e 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java @@ -36,7 +36,7 @@ public class MSCROutputHandler implements OutputHandler { private final boolean mapOnlyJob; private DoNode workingNode; - private Map<Integer, Path> multiPaths; + private Map<Integer, PathTarget> multiPaths; private int jobCount; public MSCROutputHandler(Job job, Path outputPath, boolean mapOnlyJob) { @@ -54,7 +54,7 @@ public class MSCROutputHandler implements OutputHandler { public boolean configure(Target target, PType<?> ptype) { if (target instanceof MapReduceTarget) { if (target instanceof PathTarget) { - multiPaths.put(jobCount, ((PathTarget) target).getPath()); + multiPaths.put(jobCount, (PathTarget) target); } String name = PlanningParameters.MULTI_OUTPUT_PREFIX + jobCount; @@ -71,7 +71,7 @@ public class MSCROutputHandler implements OutputHandler { return mapOnlyJob; } - public Map<Integer, Path> getMultiPaths() { + public Map<Integer, PathTarget> getMultiPaths() { return multiPaths; } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/FileNamingScheme.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/FileNamingScheme.java b/crunch/src/main/java/org/apache/crunch/io/FileNamingScheme.java new file mode 100644 index 0000000..cf93651 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/io/FileNamingScheme.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +/** + * Encapsulates rules for naming output files. It is the responsibility of + * implementors to avoid file name collisions. + */ +public interface FileNamingScheme { + + /** + * Get the output file name for a map task. Note that the implementation is + * responsible for avoiding naming collisions. + * + * @param configuration The configuration of the job for which the map output + * is being written + * @param outputDirectory The directory where the output will be written + * @return The filename for the output of the map task + * @throws IOException if an exception occurs while accessing the output file + * system + */ + String getMapOutputName(Configuration configuration, Path outputDirectory) throws IOException; + + /** + * Get the output file name for a reduce task. Note that the implementation is + * responsible for avoiding naming collisions. + * + * @param configuration The configuration of the job for which output is being + * written + * @param outputDirectory The directory where the file will be written + * @param partitionId The partition of the reduce task being output + * @return The filename for the output of the reduce task + * @throws IOException if an exception occurs while accessing output file + * system + */ + String getReduceOutputName(Configuration configuration, Path outputDirectory, int partitionId) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/PathTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/PathTarget.java b/crunch/src/main/java/org/apache/crunch/io/PathTarget.java index 884ba43..7a35209 100644 --- a/crunch/src/main/java/org/apache/crunch/io/PathTarget.java +++ b/crunch/src/main/java/org/apache/crunch/io/PathTarget.java @@ -19,6 +19,18 @@ package org.apache.crunch.io; import org.apache.hadoop.fs.Path; +/** + * A target whose output goes to a given path on a file system. + */ public interface PathTarget extends MapReduceTarget { + Path getPath(); + + /** + * Get the naming scheme to be used for outputs being written to an output + * path. + * + * @return the naming scheme to be used + */ + FileNamingScheme getFileNamingScheme(); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java b/crunch/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java new file mode 100644 index 0000000..0b2affa --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * Default {@link FileNamingScheme} that uses an incrementing sequence number in + * order to generate unique file names. + */ +public class SequentialFileNamingScheme implements FileNamingScheme { + + @Override + public String getMapOutputName(Configuration configuration, Path outputDirectory) throws IOException { + return getSequentialFileName(configuration, outputDirectory, "m"); + } + + @Override + public String getReduceOutputName(Configuration configuration, Path outputDirectory, int partitionId) + throws IOException { + return getSequentialFileName(configuration, outputDirectory, "r"); + } + + private String getSequentialFileName(Configuration configuration, Path outputDirectory, String jobTypeName) + throws IOException { + FileSystem fileSystem = FileSystem.get(configuration); + int fileSequenceNumber = fileSystem.listStatus(outputDirectory).length; + + return String.format("part-%s-%05d", jobTypeName, fileSequenceNumber); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/To.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/To.java b/crunch/src/main/java/org/apache/crunch/io/To.java index faaa4d8..da92727 100644 --- a/crunch/src/main/java/org/apache/crunch/io/To.java +++ b/crunch/src/main/java/org/apache/crunch/io/To.java @@ -36,7 +36,7 @@ public class To { } public static Target formattedFile(Path path, Class<? extends FileOutputFormat> formatClass) { - return new FileTargetImpl(path, formatClass); + return new FileTargetImpl(path, formatClass, new SequentialFileNamingScheme()); } public static Target avroFile(String pathName) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java index 8b6208d..76103e5 100644 --- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java +++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java @@ -17,13 +17,19 @@ */ package org.apache.crunch.io.avro; +import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.SequentialFileNamingScheme; import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl; import org.apache.crunch.types.avro.AvroType; import org.apache.hadoop.fs.Path; public class AvroFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> { public AvroFileSourceTarget(Path path, AvroType<T> atype) { - super(new AvroFileSource<T>(path, atype), new AvroFileTarget(path)); + this(path, atype, new SequentialFileNamingScheme()); + } + + public AvroFileSourceTarget(Path path, AvroType<T> atype, FileNamingScheme fileNamingScheme) { + super(new AvroFileSource<T>(path, atype), new AvroFileTarget(path), fileNamingScheme); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java index 91deac4..3a9e42c 100644 --- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java +++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java @@ -19,7 +19,9 @@ package org.apache.crunch.io.avro; import org.apache.avro.mapred.AvroWrapper; import org.apache.crunch.SourceTarget; +import org.apache.crunch.io.FileNamingScheme; import org.apache.crunch.io.OutputHandler; +import org.apache.crunch.io.SequentialFileNamingScheme; import org.apache.crunch.io.impl.FileTargetImpl; import org.apache.crunch.types.PType; import org.apache.crunch.types.avro.AvroOutputFormat; @@ -31,12 +33,17 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; public class AvroFileTarget extends FileTargetImpl { + public AvroFileTarget(String path) { this(new Path(path)); } public AvroFileTarget(Path path) { - super(path, AvroOutputFormat.class); + this(path, new SequentialFileNamingScheme()); + } + + public AvroFileTarget(Path path, FileNamingScheme fileNamingScheme) { + super(path, AvroOutputFormat.class, fileNamingScheme); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java index ecae0de..00df45e 100644 --- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java +++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java @@ -20,6 +20,7 @@ package org.apache.crunch.io.impl; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.crunch.SourceTarget; import org.apache.crunch.hadoop.mapreduce.lib.output.CrunchMultipleOutputs; +import org.apache.crunch.io.FileNamingScheme; import org.apache.crunch.io.OutputHandler; import org.apache.crunch.io.PathTarget; import org.apache.crunch.types.Converter; @@ -32,10 +33,13 @@ public class FileTargetImpl implements PathTarget { protected final Path path; private final Class<? extends FileOutputFormat> outputFormatClass; + private final FileNamingScheme fileNamingScheme; - public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass) { + public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass, + FileNamingScheme fileNamingScheme) { this.path = path; this.outputFormatClass = outputFormatClass; + this.fileNamingScheme = fileNamingScheme; } @Override @@ -74,6 +78,11 @@ public class FileTargetImpl implements PathTarget { } @Override + public FileNamingScheme getFileNamingScheme() { + return fileNamingScheme; + } + + @Override public boolean equals(Object other) { if (other == null || !getClass().equals(other.getClass())) { return false; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java index 465797a..6506816 100644 --- a/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java +++ b/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java @@ -19,6 +19,7 @@ package org.apache.crunch.io.impl; import java.io.IOException; +import org.apache.crunch.io.FileNamingScheme; import org.apache.crunch.io.PathTarget; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.ReadableSourceTarget; @@ -26,8 +27,8 @@ import org.apache.hadoop.conf.Configuration; public class ReadableSourcePathTargetImpl<T> extends SourcePathTargetImpl<T> implements ReadableSourceTarget<T> { - public ReadableSourcePathTargetImpl(ReadableSource<T> source, PathTarget target) { - super(source, target); + public ReadableSourcePathTargetImpl(ReadableSource<T> source, PathTarget target, FileNamingScheme fileNamingScheme) { + super(source, target, fileNamingScheme); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java index 87f4901..c0d7ce0 100644 --- a/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java +++ b/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java @@ -18,6 +18,7 @@ package org.apache.crunch.io.impl; import org.apache.crunch.Source; +import org.apache.crunch.io.FileNamingScheme; import org.apache.crunch.io.PathTarget; import org.apache.crunch.types.PType; import org.apache.hadoop.fs.Path; @@ -25,8 +26,11 @@ import org.apache.hadoop.mapreduce.Job; public class SourcePathTargetImpl<T> extends SourceTargetImpl<T> implements PathTarget { - public SourcePathTargetImpl(Source<T> source, PathTarget target) { + private final FileNamingScheme fileNamingScheme; + + public SourcePathTargetImpl(Source<T> source, PathTarget target, FileNamingScheme fileNamingScheme) { super(source, target); + this.fileNamingScheme = fileNamingScheme; } @Override @@ -38,4 +42,9 @@ public class SourcePathTargetImpl<T> extends SourceTargetImpl<T> implements Path public Path getPath() { return ((PathTarget) target).getPath(); } + + @Override + public FileNamingScheme getFileNamingScheme() { + return fileNamingScheme; + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java index eb500dd..a8ff639 100644 --- a/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java +++ b/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java @@ -19,13 +19,19 @@ package org.apache.crunch.io.impl; import org.apache.crunch.Pair; import org.apache.crunch.TableSource; +import org.apache.crunch.io.FileNamingScheme; import org.apache.crunch.io.PathTarget; +import org.apache.crunch.io.SequentialFileNamingScheme; import org.apache.crunch.types.PTableType; public class TableSourcePathTargetImpl<K, V> extends SourcePathTargetImpl<Pair<K, V>> implements TableSource<K, V> { public TableSourcePathTargetImpl(TableSource<K, V> source, PathTarget target) { - super(source, target); + this(source, target, new SequentialFileNamingScheme()); + } + + public TableSourcePathTargetImpl(TableSource<K, V> source, PathTarget target, FileNamingScheme fileNamingScheme) { + super(source, target, fileNamingScheme); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java index f532472..adc739f 100644 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java +++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java @@ -17,17 +17,24 @@ */ package org.apache.crunch.io.seq; +import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.SequentialFileNamingScheme; import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl; import org.apache.crunch.types.PType; import org.apache.hadoop.fs.Path; public class SeqFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> { + public SeqFileSourceTarget(String path, PType<T> ptype) { this(new Path(path), ptype); } public SeqFileSourceTarget(Path path, PType<T> ptype) { - super(new SeqFileSource<T>(path, ptype), new SeqFileTarget(path)); + this(path, ptype, new SequentialFileNamingScheme()); + } + + public SeqFileSourceTarget(Path path, PType<T> ptype, FileNamingScheme fileNamingScheme) { + super(new SeqFileSource<T>(path, ptype), new SeqFileTarget(path), fileNamingScheme); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java index a1fccb5..e13de1d 100644 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java +++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java @@ -19,6 +19,8 @@ package org.apache.crunch.io.seq; import org.apache.crunch.Pair; import org.apache.crunch.TableSource; +import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.SequentialFileNamingScheme; import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl; import org.apache.crunch.types.PTableType; import org.apache.hadoop.fs.Path; @@ -32,7 +34,11 @@ public class SeqFileTableSourceTarget<K, V> extends ReadableSourcePathTargetImpl } public SeqFileTableSourceTarget(Path path, PTableType<K, V> tableType) { - super(new SeqFileTableSource<K, V>(path, tableType), new SeqFileTarget(path)); + this(path, tableType, new SequentialFileNamingScheme()); + } + + public SeqFileTableSourceTarget(Path path, PTableType<K, V> tableType, FileNamingScheme fileNamingScheme) { + super(new SeqFileTableSource<K, V>(path, tableType), new SeqFileTarget(path), fileNamingScheme); this.tableType = tableType; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java index c03543a..60e4739 100644 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java +++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java @@ -18,6 +18,8 @@ package org.apache.crunch.io.seq; import org.apache.crunch.SourceTarget; +import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.SequentialFileNamingScheme; import org.apache.crunch.io.impl.FileTargetImpl; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; @@ -30,7 +32,11 @@ public class SeqFileTarget extends FileTargetImpl { } public SeqFileTarget(Path path) { - super(path, SequenceFileOutputFormat.class); + this(path, new SequentialFileNamingScheme()); + } + + public SeqFileTarget(Path path, FileNamingScheme fileNamingScheme) { + super(path, SequenceFileOutputFormat.class, fileNamingScheme); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java index 1a3d522..1d1211e 100644 --- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java +++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java @@ -17,17 +17,24 @@ */ package org.apache.crunch.io.text; +import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.SequentialFileNamingScheme; import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl; import org.apache.crunch.types.PType; import org.apache.hadoop.fs.Path; public class TextFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> { + public TextFileSourceTarget(String path, PType<T> ptype) { this(new Path(path), ptype); } public TextFileSourceTarget(Path path, PType<T> ptype) { - super(new TextFileSource<T>(path, ptype), new TextFileTarget(path)); + this(path, ptype, new SequentialFileNamingScheme()); + } + + public TextFileSourceTarget(Path path, PType<T> ptype, FileNamingScheme fileNamingScheme) { + super(new TextFileSource<T>(path, ptype), new TextFileTarget(path), fileNamingScheme); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java index aa2f8e8..c7e06d3 100644 --- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java +++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java @@ -18,6 +18,8 @@ package org.apache.crunch.io.text; import org.apache.crunch.SourceTarget; +import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.SequentialFileNamingScheme; import org.apache.crunch.io.impl.FileTargetImpl; import org.apache.crunch.types.Converter; import org.apache.crunch.types.PTableType; @@ -43,7 +45,11 @@ public class TextFileTarget extends FileTargetImpl { } public <T> TextFileTarget(Path path) { - super(path, null); + this(path, new SequentialFileNamingScheme()); + } + + public <T> TextFileTarget(Path path, FileNamingScheme fileNamingScheme) { + super(path, null, fileNamingScheme); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java new file mode 100644 index 0000000..00ad830 --- /dev/null +++ b/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.exec; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class CrunchJobTest { + + @Test + public void testExtractPartitionNumber() { + assertEquals(0, CrunchJob.extractPartitionNumber("out1-r-00000")); + assertEquals(10, CrunchJob.extractPartitionNumber("out2-r-00010")); + assertEquals(99999, CrunchJob.extractPartitionNumber("out3-r-99999")); + } + + @Test + public void testExtractPartitionNumber_WithSuffix() { + assertEquals(10, CrunchJob.extractPartitionNumber("out2-r-00010.avro")); + } + + @Test(expected = IllegalArgumentException.class) + public void testExtractPartitionNumber_MapOutputFile() { + CrunchJob.extractPartitionNumber("out1-m-00000"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java b/crunch/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java new file mode 100644 index 0000000..467da15 --- /dev/null +++ b/crunch/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class SequentialFileNamingSchemeTest { + + // The partition id used for testing. This partition id should be ignored by + // the SequentialFileNamingScheme. + private static final int PARTITION_ID = 42; + + private SequentialFileNamingScheme namingScheme; + private Configuration configuration; + + @Rule + public TemporaryFolder tmpOutputDir = new TemporaryFolder(); + + @Before + public void setUp() throws IOException { + configuration = new Configuration(); + namingScheme = new SequentialFileNamingScheme(); + } + + @Test + public void testGetMapOutputName_EmptyDirectory() throws IOException { + assertEquals("part-m-00000", + namingScheme.getMapOutputName(configuration, new Path(tmpOutputDir.getRoot().getAbsolutePath()))); + } + + @Test + public void testGetMapOutputName_NonEmptyDirectory() throws IOException { + File outputDirectory = tmpOutputDir.getRoot(); + + new File(outputDirectory, "existing-1").createNewFile(); + new File(outputDirectory, "existing-2").createNewFile(); + + assertEquals("part-m-00002", + namingScheme.getMapOutputName(configuration, new Path(outputDirectory.getAbsolutePath()))); + } + + @Test + public void testGetReduceOutputName_EmptyDirectory() throws IOException { + assertEquals("part-r-00000", namingScheme.getReduceOutputName(configuration, new Path(tmpOutputDir.getRoot() + .getAbsolutePath()), PARTITION_ID)); + } + + @Test + public void testGetReduceOutputName_NonEmptyDirectory() throws IOException { + File outputDirectory = tmpOutputDir.getRoot(); + + new File(outputDirectory, "existing-1").createNewFile(); + new File(outputDirectory, "existing-2").createNewFile(); + + assertEquals("part-r-00002", + namingScheme.getReduceOutputName(configuration, new Path(outputDirectory.getAbsolutePath()), PARTITION_ID)); + } + +}
