Updated Branches: refs/heads/master 715128b93 -> 752fed58e
CRUNCH-331: Altered the default for disabling combining files to true and enabled the combining for sources we know could benefit from the behavior Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/752fed58 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/752fed58 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/752fed58 Branch: refs/heads/master Commit: 752fed58ee1d9b8b09ae959536af06b1114edf7f Parents: 715128b Author: Micah Whitacre <[email protected]> Authored: Tue Feb 4 22:04:36 2014 -0600 Committer: Micah Whitacre <[email protected]> Committed: Tue Feb 4 22:04:36 2014 -0600 ---------------------------------------------------------------------- .../java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java | 2 +- .../java/org/apache/crunch/impl/mr/run/RuntimeParameters.java | 5 +++++ .../main/java/org/apache/crunch/io/avro/AvroFileSource.java | 4 +++- .../java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java | 2 ++ .../src/main/java/org/apache/crunch/io/seq/SeqFileSource.java | 6 +++++- .../main/java/org/apache/crunch/io/text/TextFileSource.java | 6 +++++- 6 files changed, 21 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/752fed58/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java index 0c6f5e1..8f1c853 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java @@ -52,7 +52,7 @@ public class CrunchInputFormat<K, V> extends InputFormat<K, V> { Job jobCopy = new Job(conf); InputFormat<?, ?> format = (InputFormat<?, ?>) ReflectionUtils.newInstance(inputBundle.getFormatClass(), jobCopy.getConfiguration()); - if (format instanceof FileInputFormat && !conf.getBoolean(RuntimeParameters.DISABLE_COMBINE_FILE, false)) { + if (format instanceof FileInputFormat && !conf.getBoolean(RuntimeParameters.DISABLE_COMBINE_FILE, true)) { format = new CrunchCombineFileInputFormat<Object, Object>(job); } for (Map.Entry<Integer, List<Path>> nodeEntry : entry.getValue().entrySet()) { http://git-wip-us.apache.org/repos/asf/crunch/blob/752fed58/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java index 0c9f229..07abf11 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java @@ -28,6 +28,11 @@ public final class RuntimeParameters { public static final String LOG_JOB_PROGRESS = "crunch.log.job.progress"; + /** + * Runtime property which indicates that a {@link org.apache.crunch.Source} should attempt to combine small files + * to reduce overhead by default splits. Unless overridden by the {@code Source} implementation it will default to + * {@code true}. + */ public static final String DISABLE_COMBINE_FILE = "crunch.disable.combine.file"; public static final String COMBINE_FILE_BLOCK_SIZE = "crunch.combine.file.block.size"; http://git-wip-us.apache.org/repos/asf/crunch/blob/752fed58/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java index 17f47d7..1b6b27b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.avro.io.DatumReader; import org.apache.avro.mapred.AvroJob; import org.apache.crunch.ReadableData; +import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.impl.FileSourceImpl; @@ -39,7 +40,8 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSour FormatBundle bundle = FormatBundle.forInput(AvroInputFormat.class) .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect())) .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString()) - .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName()); + .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName()) + .set(RuntimeParameters.DISABLE_COMBINE_FILE, Boolean.FALSE.toString()); AvroMode.fromType(ptype).configure(bundle); return bundle; } http://git-wip-us.apache.org/repos/asf/crunch/blob/752fed58/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java index 3f387d7..fb0e8fe 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java @@ -19,6 +19,7 @@ package org.apache.crunch.io.avro.trevni; import java.util.List; import org.apache.avro.mapred.AvroJob; +import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.ReadableData; @@ -37,6 +38,7 @@ public class TrevniKeySource<T> extends FileSourceImpl<T> implements ReadableSou return FormatBundle.forInput(AvroTrevniKeyInputFormat.class) .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect())) .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString()) + .set(RuntimeParameters.DISABLE_COMBINE_FILE, Boolean.FALSE.toString()) .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName()); } http://git-wip-us.apache.org/repos/asf/crunch/blob/752fed58/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java index 1bf64e4..0c2a14c 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java @@ -19,7 +19,10 @@ package org.apache.crunch.io.seq; import java.io.IOException; +import java.util.Collections; import java.util.List; + +import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.ReadableData; import org.apache.crunch.io.impl.FileSourceImpl; @@ -31,11 +34,12 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; public class SeqFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> { public SeqFileSource(Path path, PType<T> ptype) { - super(path, ptype, SequenceFileInputFormat.class); + this(Collections.<Path>singletonList(path), ptype); } public SeqFileSource(List<Path> paths, PType<T> ptype) { super(paths, ptype, SequenceFileInputFormat.class); + inputBundle.set(RuntimeParameters.DISABLE_COMBINE_FILE, Boolean.FALSE.toString()); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/752fed58/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java index fe23c47..732288d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java @@ -19,7 +19,10 @@ package org.apache.crunch.io.text; import java.io.IOException; +import java.util.Collections; import java.util.List; + +import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.ReadableData; import org.apache.crunch.io.impl.FileSourceImpl; @@ -42,11 +45,12 @@ public class TextFileSource<T> extends FileSourceImpl<T> implements ReadableSour } public TextFileSource(Path path, PType<T> ptype) { - super(path, ptype, getInputFormat(path, ptype)); + this(Collections.singletonList(path), ptype); } public TextFileSource(List<Path> paths, PType<T> ptype) { super(paths, ptype, getInputFormat(paths.get(0), ptype)); + inputBundle.set(RuntimeParameters.DISABLE_COMBINE_FILE, Boolean.FALSE.toString()); } @Override
