Updated Branches: refs/heads/master 1c58b6fb2 -> 1897ae82f
CRUNCH-82: Added support non HDFS OutputFormats when writing to multiple targets. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/1897ae82 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/1897ae82 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/1897ae82 Branch: refs/heads/master Commit: 1897ae82fdb94e9c6bd4314dee0c6dfa165ae5e0 Parents: 1c58b6f Author: Robert Chu <[email protected]> Authored: Mon Sep 17 16:35:56 2012 -0700 Committer: Robert Chu <[email protected]> Committed: Fri Sep 28 14:46:42 2012 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/io/hbase/HBaseTarget.java | 19 +++++++++- .../org/apache/crunch/impl/mr/exec/CrunchJob.java | 10 +++-- .../crunch/impl/mr/plan/MSCROutputHandler.java | 26 ++++++++------- 3 files changed, 37 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1897ae82/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java index 050cff1..c659c86 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.crunch.SourceTarget; +import org.apache.crunch.hadoop.mapreduce.lib.output.CrunchMultipleOutputs; import org.apache.crunch.impl.mr.run.CrunchRuntimeException; import org.apache.crunch.io.MapReduceTarget; import org.apache.crunch.io.OutputHandler; @@ -29,9 +30,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class HBaseTarget implements MapReduceTarget { @@ -75,15 +78,27 @@ public class HBaseTarget implements MapReduceTarget { @Override public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) { - Configuration conf = job.getConfiguration(); + final Configuration conf = job.getConfiguration(); HBaseConfiguration.addHbaseResources(conf); - job.setOutputFormatClass(TableOutputFormat.class); conf.set(TableOutputFormat.OUTPUT_TABLE, table); + try { TableMapReduceUtil.addDependencyJars(job); + FileOutputFormat.setOutputPath(job, outputPath); } catch (IOException e) { throw new CrunchRuntimeException(e); } + + if (null == name) { + job.setOutputFormatClass(TableOutputFormat.class); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(Put.class); + } else { + CrunchMultipleOutputs.addNamedOutput(job, name, + TableOutputFormat.class, + ImmutableBytesWritable.class, + Put.class); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1897ae82/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java index 85fae63..74c6ff3 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java @@ -18,7 +18,7 @@ package org.apache.crunch.impl.mr.exec; import java.io.IOException; -import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,7 +38,7 @@ public class CrunchJob extends CrunchControlledJob { private final Log log = LogFactory.getLog(CrunchJob.class); private final Path workingPath; - private final List<Path> multiPaths; + private final Map<Integer, Path> multiPaths; private final boolean mapOnlyJob; public CrunchJob(Job job, Path workingPath, MSCROutputHandler handler) throws IOException { @@ -53,10 +53,12 @@ public class CrunchJob extends CrunchControlledJob { // Need to handle moving the data from the output directory of the // job to the output locations specified in the paths. FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration()); - for (int i = 0; i < multiPaths.size(); i++) { + for (Map.Entry<Integer, Path> entry : multiPaths.entrySet()) { + final int i = entry.getKey(); + final Path dst = entry.getValue(); + Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*"); Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src); - Path dst = multiPaths.get(i); FileSystem dstFs = dst.getFileSystem(job.getConfiguration()); if (!dstFs.exists(dst)) { dstFs.mkdirs(dst); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1897ae82/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java index bfd3e26..b6a41da 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java @@ -17,7 +17,7 @@ */ package org.apache.crunch.impl.mr.plan; -import java.util.List; +import java.util.Map; import org.apache.crunch.Target; import org.apache.crunch.io.MapReduceTarget; @@ -27,7 +27,7 @@ import org.apache.crunch.types.PType; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; -import com.google.common.collect.Lists; +import com.google.common.collect.Maps; public class MSCROutputHandler implements OutputHandler { @@ -36,13 +36,14 @@ public class MSCROutputHandler implements OutputHandler { private final boolean mapOnlyJob; private DoNode workingNode; - private List<Path> multiPaths; + private Map<Integer, Path> multiPaths; + private int jobCount; public MSCROutputHandler(Job job, Path outputPath, boolean mapOnlyJob) { this.job = job; this.path = outputPath; this.mapOnlyJob = mapOnlyJob; - this.multiPaths = Lists.newArrayList(); + this.multiPaths = Maps.newHashMap(); } public void configureNode(DoNode node, Target target) { @@ -51,17 +52,18 @@ public class MSCROutputHandler implements OutputHandler { } public boolean configure(Target target, PType<?> ptype) { - if (target instanceof MapReduceTarget && target instanceof PathTarget) { - String name = PlanningParameters.MULTI_OUTPUT_PREFIX + multiPaths.size(); - multiPaths.add(((PathTarget) target).getPath()); + if (target instanceof MapReduceTarget) { + if (target instanceof PathTarget) { + multiPaths.put(jobCount, ((PathTarget) target).getPath()); + } + + String name = PlanningParameters.MULTI_OUTPUT_PREFIX + jobCount; + jobCount++; workingNode.setOutputName(name); ((MapReduceTarget) target).configureForMapReduce(job, ptype, path, name); return true; } - if (target instanceof MapReduceTarget) { - ((MapReduceTarget) target).configureForMapReduce(job, ptype, null, null); - return true; - } + return false; } @@ -69,7 +71,7 @@ public class MSCROutputHandler implements OutputHandler { return mapOnlyJob; } - public List<Path> getMultiPaths() { + public Map<Integer, Path> getMultiPaths() { return multiPaths; } }
