Updated Branches: refs/heads/apache-crunch-0.8 da0b14bda -> 1552bc67a
CRUNCH-335: Better Configuration compression for multipath input sources. Contributed by Maxim Gurevich. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/1552bc67 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/1552bc67 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/1552bc67 Branch: refs/heads/apache-crunch-0.8 Commit: 1552bc67a70deae9953aa1e155b9440ef7268cf1 Parents: da0b14b Author: Josh Wills <[email protected]> Authored: Tue Jan 28 20:46:47 2014 -0800 Committer: Josh Wills <[email protected]> Committed: Tue Jan 28 21:03:05 2014 -0800 ---------------------------------------------------------------------- .../java/org/apache/crunch/io/CrunchInputs.java | 22 ++++++++++++++++++-- .../apache/crunch/io/impl/FileSourceImpl.java | 4 +--- 2 files changed, 21 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/1552bc67/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java index bcdcb55..27bd696 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java @@ -17,6 +17,8 @@ */ package org.apache.crunch.io; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -28,6 +30,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -41,12 +44,23 @@ public class CrunchInputs { private static final char RECORD_SEP = ','; private static final char FIELD_SEP = ';'; + private static final char PATH_SEP = '|'; private static final Joiner JOINER = Joiner.on(FIELD_SEP); private static final Splitter SPLITTER = Splitter.on(FIELD_SEP); public static void addInputPath(Job job, Path path, FormatBundle inputBundle, int nodeIndex) { + addInputPaths(job, Collections.singleton(path), inputBundle, nodeIndex); + } + + public static void addInputPaths(Job job, Collection<Path> paths, FormatBundle inputBundle, int nodeIndex) { Configuration conf = job.getConfiguration(); - String inputs = JOINER.join(inputBundle.serialize(), String.valueOf(nodeIndex), path.toString()); + List<String> pathStrs = Lists.newArrayListWithExpectedSize(paths.size()); + for (Path path : paths) { + String pathStr = path.toString(); + Preconditions.checkArgument(pathStr.indexOf(RECORD_SEP) == -1 && pathStr.indexOf(FIELD_SEP) == -1 && pathStr.indexOf(PATH_SEP) == -1); + pathStrs.add(pathStr); + } + String inputs = JOINER.join(inputBundle.serialize(), String.valueOf(nodeIndex), Joiner.on(PATH_SEP).join(pathStrs)); String existing = conf.get(CRUNCH_INPUTS); conf.set(CRUNCH_INPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs); } @@ -68,7 +82,11 @@ public class CrunchInputs { if (!formatNodeMap.get(inputBundle).containsKey(nodeIndex)) { formatNodeMap.get(inputBundle).put(nodeIndex, Lists.<Path> newLinkedList()); } - formatNodeMap.get(inputBundle).get(nodeIndex).add(new Path(fields.get(2))); + List<Path> formatNodePaths = formatNodeMap.get(inputBundle).get(nodeIndex); + String paths = fields.get(2); + for (String path : Splitter.on(PATH_SEP).split(paths)) { + formatNodePaths.add(new Path(path)); + } } return formatNodeMap; } http://git-wip-us.apache.org/repos/asf/crunch/blob/1552bc67/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java index 766b9b0..1151ad5 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java @@ -100,9 +100,7 @@ public class FileSourceImpl<T> implements Source<T> { public void configureSource(Job job, int inputId) throws IOException { // Use Crunch to handle the combined input splits job.setInputFormatClass(CrunchInputFormat.class); - for (Path path : paths) { - CrunchInputs.addInputPath(job, path, inputBundle, inputId); - } + CrunchInputs.addInputPaths(job, paths, inputBundle, inputId); } public FormatBundle<? extends InputFormat> getBundle() {
