Updated Branches: refs/heads/master 33f53e629 -> 04438473c
CRUNCH-47: Switch FS.get(conf) references to path.getFileSystem so Crunch will run with S3FileSystem Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/04438473 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/04438473 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/04438473 Branch: refs/heads/master Commit: 04438473c7bd8681331aa180acf7e3c2c2d3faef Parents: 33f53e6 Author: jwills <[email protected]> Authored: Tue Aug 14 15:43:57 2012 -0700 Committer: jwills <[email protected]> Committed: Tue Aug 14 15:43:57 2012 -0700 ---------------------------------------------------------------------- .../lib/jobcontrol/CrunchControlledJob.java | 10 +++--- .../org/apache/crunch/impl/mem/MemPipeline.java | 2 +- .../java/org/apache/crunch/impl/mr/MRPipeline.java | 4 +- .../org/apache/crunch/impl/mr/exec/CrunchJob.java | 28 +++++++++++--- .../org/apache/crunch/io/SourceTargetHelper.java | 2 +- .../org/apache/crunch/io/avro/AvroFileSource.java | 2 +- .../org/apache/crunch/io/seq/SeqFileSource.java | 2 +- .../apache/crunch/io/seq/SeqFileTableSource.java | 2 +- .../org/apache/crunch/io/text/TextFileSource.java | 3 +- .../java/org/apache/crunch/util/DistCache.java | 2 +- 10 files changed, 36 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java index 205e8e3..396ea2d 100644 --- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java +++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java @@ -316,12 +316,12 @@ public class CrunchControlledJob { try { Configuration conf = job.getConfiguration(); if (conf.getBoolean(CREATE_DIR, false)) { - FileSystem fs = FileSystem.get(conf); - Path inputPaths[] = FileInputFormat.getInputPaths(job); - for (int i = 0; i < inputPaths.length; i++) { - if (!fs.exists(inputPaths[i])) { + Path[] inputPaths = FileInputFormat.getInputPaths(job); + for (Path inputPath : inputPaths) { + FileSystem fs = inputPath.getFileSystem(conf); + if (!fs.exists(inputPath)) { try { - fs.mkdirs(inputPaths[i]); + fs.mkdirs(inputPath); } catch (IOException e) { } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index 767524c..77c41ce 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -147,7 +147,7 @@ public class MemPipeline implements Pipeline { if (target instanceof PathTarget) { Path path = ((PathTarget) target).getPath(); try { - FileSystem fs = FileSystem.get(conf); + FileSystem fs = path.getFileSystem(conf); FSDataOutputStream os = fs.create(new Path(path, "out")); if (collection instanceof PTable) { for (Object o : collection.materialize()) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index ec03781..d9ee0c1 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -267,7 +267,7 @@ public class MRPipeline implements Pipeline { private static Path createTempDirectory(Configuration conf) { Path dir = createTemporaryPath(conf); try { - FileSystem.get(conf).mkdirs(dir); + dir.getFileSystem(conf).mkdirs(dir); } catch (IOException e) { throw new RuntimeException("Cannot create job output directory " + dir, e); } @@ -293,7 +293,7 @@ public class MRPipeline implements Pipeline { return; } try { - FileSystem fs = FileSystem.get(conf); + FileSystem fs = tempDirectory.getFileSystem(conf); if (fs.exists(tempDirectory)) { fs.delete(tempDirectory, true); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java index 8327f58..85fae63 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java @@ -52,22 +52,38 @@ public class CrunchJob extends CrunchControlledJob { if (!multiPaths.isEmpty()) { // Need to handle moving the data from the output directory of the // job to the output locations specified in the paths. - FileSystem fs = FileSystem.get(job.getConfiguration()); + FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration()); for (int i = 0; i < multiPaths.size(); i++) { Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*"); - Path[] srcs = FileUtil.stat2Paths(fs.globStatus(src), src); + Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src); Path dst = multiPaths.get(i); - if (!fs.exists(dst)) { - fs.mkdirs(dst); + FileSystem dstFs = dst.getFileSystem(job.getConfiguration()); + if (!dstFs.exists(dst)) { + dstFs.mkdirs(dst); } - int minPartIndex = getMinPartIndex(dst, fs); + boolean sameFs = isCompatible(srcFs, dst); + int minPartIndex = getMinPartIndex(dst, dstFs); for (Path s : srcs) { - fs.rename(s, getDestFile(s, dst, minPartIndex++)); + Path d = getDestFile(s, dst, minPartIndex++); + if (sameFs) { + srcFs.rename(s, d); + } else { + FileUtil.copy(srcFs, s, dstFs, d, true, true, job.getConfiguration()); + } } } } } + private boolean isCompatible(FileSystem fs, Path path) { + try { + fs.makeQualified(path); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } + private Path getDestFile(Path src, Path dir, int index) { String form = "part-%s-%05d"; if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java b/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java index f52bfe7..bbe5eaa 100644 --- a/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java +++ b/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java @@ -36,7 +36,7 @@ public class SourceTargetHelper { private static final Log LOG = LogFactory.getLog(SourceTargetHelper.class); public static long getPathSize(Configuration conf, Path path) throws IOException { - return getPathSize(FileSystem.get(conf), path); + return getPathSize(path.getFileSystem(conf), path); } public static long getPathSize(FileSystem fs, Path path) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java index 8bfb4dc..0ce4c06 100644 --- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java +++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java @@ -47,7 +47,7 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSour @Override public Iterable<T> read(Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(path.toUri(), conf); + FileSystem fs = path.getFileSystem(conf); return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>((AvroType<T>) ptype, conf)); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java index 6f0ad05..e8f3dcf 100644 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java +++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java @@ -36,7 +36,7 @@ public class SeqFileSource<T> extends FileSourceImpl<T> implements ReadableSourc @Override public Iterable<T> read(Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(path.toUri(), conf); + FileSystem fs = path.getFileSystem(conf); return CompositePathIterable.create(fs, path, new SeqFileReaderFactory<T>(ptype, conf)); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java index a846d66..56ed985 100644 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java +++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java @@ -44,7 +44,7 @@ public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K, V> implemen @Override public Iterable<Pair<K, V>> read(Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(path.toUri(), conf); + FileSystem fs = path.getFileSystem(conf); return CompositePathIterable.create(fs, path, new SeqFileTableReaderFactory<K, V>((PTableType<K, V>) ptype, conf)); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java index 6310995..ee51c04 100644 --- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java +++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java @@ -26,7 +26,6 @@ import org.apache.crunch.types.PType; import org.apache.crunch.types.avro.AvroTypeFamily; import org.apache.crunch.types.avro.AvroUtf8InputFormat; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; @@ -68,7 +67,7 @@ public class TextFileSource<T> extends FileSourceImpl<T> implements ReadableSour @Override public Iterable<T> read(Configuration conf) throws IOException { - return CompositePathIterable.create(FileSystem.get(path.toUri(), conf), path, new TextFileReaderFactory<T>(ptype, + return CompositePathIterable.create(path.getFileSystem(conf), path, new TextFileReaderFactory<T>(ptype, conf)); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/util/DistCache.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/util/DistCache.java b/crunch/src/main/java/org/apache/crunch/util/DistCache.java index 6ab2a50..20675d2 100644 --- a/crunch/src/main/java/org/apache/crunch/util/DistCache.java +++ b/crunch/src/main/java/org/apache/crunch/util/DistCache.java @@ -51,7 +51,7 @@ public class DistCache { private static final String TMPJARS_KEY = "tmpjars"; public static void write(Configuration conf, Path path, Object value) throws IOException { - ObjectOutputStream oos = new ObjectOutputStream(FileSystem.get(conf).create(path)); + ObjectOutputStream oos = new ObjectOutputStream(path.getFileSystem(conf).create(path)); oos.writeObject(value); oos.close();
