Updated Branches: refs/heads/master ae748c8ea -> b7781ca08
CRUNCH-165: The latest attempt at using CombineFileInputFormat wherever possible. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b7781ca0 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b7781ca0 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b7781ca0 Branch: refs/heads/master Commit: b7781ca0858b6eee0ff7c7c2ccbe0c49f247db5d Parents: ae748c8 Author: Josh Wills <[email protected]> Authored: Fri Aug 9 16:30:37 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Mon Sep 2 08:41:45 2013 -0700 ---------------------------------------------------------------------- .../contrib/io/jdbc/DataBaseSourceIT.java | 41 ------- .../crunch/contrib/io/jdbc/DataBaseSource.java | 15 ++- .../contrib/io/jdbc/IdentifiableName.java | 56 ++++++++++ .../org/apache/crunch/io/CombineFileIT.java | 52 +++++++++ .../apache/crunch/io/CombineFileITData/src1.txt | 4 + .../apache/crunch/io/CombineFileITData/src2.txt | 4 + .../mr/run/CrunchCombineFileInputFormat.java | 37 +++++++ .../crunch/impl/mr/run/CrunchInputFormat.java | 3 + .../crunch/impl/mr/run/CrunchRecordReader.java | 111 ++++++++++++++++--- .../crunch/impl/mr/run/RuntimeParameters.java | 4 +- .../apache/crunch/io/impl/FileSourceImpl.java | 19 ++-- .../apache/crunch/io/text/NLineFileSource.java | 3 + .../crunch/io/avro/AvroFileSourceTest.java | 12 +- 13 files changed, 288 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java b/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java index 8fdb22d..1c48559 100644 --- a/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java +++ b/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java @@ -19,28 +19,17 @@ package org.apache.crunch.contrib.io.jdbc; import static org.junit.Assert.assertEquals; -import java.io.DataInput; -import java.io.DataOutput; import java.io.File; -import java.io.IOException; import java.io.Serializable; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; import java.util.List; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.PCollection; import org.apache.crunch.Pipeline; -import org.apache.crunch.contrib.io.jdbc.DataBaseSource; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.test.CrunchTestSupport; import org.apache.crunch.types.writable.Writables; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.h2.tools.RunScript; import org.h2.tools.Server; import org.junit.After; @@ -65,36 +54,6 @@ public class DataBaseSourceIT extends CrunchTestSupport implements Serializable server.stop(); } - public static class IdentifiableName implements DBWritable, Writable { - - public IntWritable id = new IntWritable(); - public Text name = new Text(); - - @Override - public void readFields(DataInput in) throws IOException { - id.readFields(in); - name.readFields(in); - } - - @Override - public void write(DataOutput out) throws IOException { - id.write(out); - name.write(out); - } - - @Override - public void readFields(ResultSet resultSet) throws SQLException { - id.set(resultSet.getInt(1)); - name.set(resultSet.getString(2)); - } - - @Override - public void write(PreparedStatement preparedStatement) throws SQLException { - throw new UnsupportedOperationException("Not implemented"); - } - - } - @Test public void testReadFromSource() throws Exception { Pipeline pipeline = new MRPipeline(DataBaseSourceIT.class); http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java index 83f509f..337ecb7 100644 --- a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java @@ -21,10 +21,13 @@ import java.io.IOException; import java.sql.Driver; import org.apache.crunch.Source; +import org.apache.crunch.io.CrunchInputs; +import org.apache.crunch.io.FormatBundle; import org.apache.crunch.types.Converter; import org.apache.crunch.types.PType; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; @@ -104,8 +107,16 @@ public class DataBaseSource<T extends DBWritable & Writable> implements Source<T public void configureSource(Job job, int inputId) throws IOException { Configuration configuration = job.getConfiguration(); DBConfiguration.configureDB(configuration, driverClass, url, username, password); - job.setInputFormatClass(DBInputFormat.class); - DBInputFormat.setInput(job, inputClass, selectClause, countClause); + if (inputId == -1) { + job.setInputFormatClass(DBInputFormat.class); + DBInputFormat.setInput(job, inputClass, selectClause, countClause); + } else { + FormatBundle<DBInputFormat> bundle = FormatBundle.forInput(DBInputFormat.class) + .set(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass.getCanonicalName()) + .set(DBConfiguration.INPUT_QUERY, selectClause) + .set(DBConfiguration.INPUT_COUNT_QUERY, countClause); + CrunchInputs.addInputPath(job, new Path("dbsource"), bundle, inputId); + } } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/IdentifiableName.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/IdentifiableName.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/IdentifiableName.java new file mode 100644 index 0000000..c8a452a --- /dev/null +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/IdentifiableName.java @@ -0,0 +1,56 @@ +/** + * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.crunch.contrib.io.jdbc; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +public class IdentifiableName implements DBWritable, Writable { + + public IntWritable id = new IntWritable(); + public Text name = new Text(); + + @Override + public void readFields(DataInput in) throws IOException { + id.readFields(in); + name.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + id.write(out); + name.write(out); + } + + @Override + public void readFields(ResultSet resultSet) throws SQLException { + id.set(resultSet.getInt(1)); + name.set(resultSet.getString(2)); + } + + @Override + public void write(PreparedStatement preparedStatement) throws SQLException { + throw new UnsupportedOperationException("Not implemented"); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java b/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java new file mode 100644 index 0000000..efcec0c --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java @@ -0,0 +1,52 @@ +/** + * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.crunch.io; + +import com.google.common.io.Files; +import org.apache.crunch.PCollection; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.test.Tests; +import org.junit.Rule; +import org.junit.Test; + +import java.io.File; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class CombineFileIT { + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testCombine() throws Exception { + File srcFiles = tmpDir.getFile("srcs"); + File outputFiles = tmpDir.getFile("out"); + assertTrue(srcFiles.mkdir()); + File src1 = tmpDir.copyResourceFile(Tests.resource(this, "src1.txt")); + File src2 = tmpDir.copyResourceFile(Tests.resource(this, "src2.txt")); + Files.copy(src1, new File(srcFiles, "src1.txt")); + Files.copy(src2, new File(srcFiles, "src2.txt")); + + MRPipeline p = new MRPipeline(CombineFileIT.class, tmpDir.getDefaultConfiguration()); + PCollection<String> in = p.readTextFile(srcFiles.getAbsolutePath()); + in.write(To.textFile(outputFiles.getAbsolutePath())); + p.done(); + assertEquals(4, outputFiles.listFiles().length); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src1.txt ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src1.txt b/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src1.txt new file mode 100644 index 0000000..9f38eb9 --- /dev/null +++ b/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src1.txt @@ -0,0 +1,4 @@ +a,1-1 +b,1-2 +c,1-3 +a,1-4 http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src2.txt ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src2.txt b/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src2.txt new file mode 100644 index 0000000..ed9524e --- /dev/null +++ b/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src2.txt @@ -0,0 +1,4 @@ +b,2-1 +c,2-2 +c,2-3 +d,2-4 http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java new file mode 100644 index 0000000..27eaf94 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.crunch.impl.mr.run; + +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +import java.io.IOException; + +public class CrunchCombineFileInputFormat<K, V> extends CombineFileInputFormat<K, V> { + private FileInputFormat<K, V> inputFormat; + + public CrunchCombineFileInputFormat(FileInputFormat<K, V> inputFormat) { + this.inputFormat = inputFormat; + } + + @Override + public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException { + throw new UnsupportedOperationException("CrunchCombineFileInputFormat.createRecordReader should never be called"); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java index cf3df81..fa4602a 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java @@ -52,6 +52,9 @@ public class CrunchInputFormat<K, V> extends InputFormat<K, V> { Job jobCopy = new Job(conf); InputFormat<?, ?> format = (InputFormat<?, ?>) ReflectionUtils.newInstance(inputBundle.getFormatClass(), jobCopy.getConfiguration()); + if (format instanceof FileInputFormat && !conf.getBoolean(RuntimeParameters.DISABLE_COMBINE_FILE, false)) { + format = new CrunchCombineFileInputFormat<Object, Object>((FileInputFormat) format); + } for (Map.Entry<Integer, List<Path>> nodeEntry : entry.getValue().entrySet()) { Integer nodeIndex = nodeEntry.getKey(); List<Path> paths = nodeEntry.getValue(); http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java index e5cbd95..32b3f74 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java @@ -25,61 +25,144 @@ import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.ReflectionUtils; class CrunchRecordReader<K, V> extends RecordReader<K, V> { - private final RecordReader<K, V> delegate; + private RecordReader<K, V> curReader; + private CrunchInputSplit crunchSplit; + private CombineFileSplit combineFileSplit; + private TaskAttemptContext context; + private int idx; + private long progress; public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context) throws IOException, InterruptedException { - CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit; + this.crunchSplit = (CrunchInputSplit) inputSplit; + if (crunchSplit.getInputSplit() instanceof CombineFileSplit) { + combineFileSplit = (CombineFileSplit) crunchSplit.getInputSplit(); + } + this.context = context; Configuration conf = crunchSplit.getConf(); if (conf == null) { conf = context.getConfiguration(); crunchSplit.setConf(conf); } - InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils.newInstance(crunchSplit.getInputFormatClass(), + initNextRecordReader(); + } + + private boolean initNextRecordReader() throws IOException, InterruptedException { + if (combineFileSplit != null) { + if (curReader != null) { + curReader.close(); + curReader = null; + if (idx > 0) { + progress += combineFileSplit.getLength(idx - 1); + } + } + // if all chunks have been processed, nothing more to do. + if (idx == combineFileSplit.getNumPaths()) { + return false; + } + } else if (idx > 0) { + return false; + } + + idx++; + Configuration conf = crunchSplit.getConf(); + InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils.newInstance( + crunchSplit.getInputFormatClass(), conf); - this.delegate = inputFormat.createRecordReader(crunchSplit.getInputSplit(), + this.curReader = inputFormat.createRecordReader(getDelegateSplit(), TaskAttemptContextFactory.create(conf, context.getTaskAttemptID())); + return true; + } + + private InputSplit getDelegateSplit() throws IOException { + if (combineFileSplit != null) { + return new FileSplit(combineFileSplit.getPath(idx - 1), + combineFileSplit.getOffset(idx - 1), + combineFileSplit.getLength(idx - 1), + combineFileSplit.getLocations()); + } else { + return crunchSplit.getInputSplit(); + } } @Override public void close() throws IOException { - delegate.close(); + if (curReader != null) { + curReader.close(); + curReader = null; + } } @Override public K getCurrentKey() throws IOException, InterruptedException { - return delegate.getCurrentKey(); + return curReader.getCurrentKey(); } @Override public V getCurrentValue() throws IOException, InterruptedException { - return delegate.getCurrentValue(); + return curReader.getCurrentValue(); } @Override public float getProgress() throws IOException, InterruptedException { - return delegate.getProgress(); + float curProgress = 0; // bytes processed in current split + if (null != curReader) { + curProgress = (float)(curReader.getProgress() * getCurLength()); + } + return Math.min(1.0f, (progress + curProgress)/getOverallLength()); + } + + private long getCurLength() { + if (combineFileSplit == null) { + return 1L; + } else { + return combineFileSplit.getLength(idx - 1); + } + } + + private float getOverallLength() { + if (combineFileSplit == null) { + return 1.0f; + } else { + return (float) combineFileSplit.getLength(); + } } @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { - CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit; + this.crunchSplit = (CrunchInputSplit) inputSplit; + this.context = context; Configuration conf = crunchSplit.getConf(); if (conf == null) { conf = context.getConfiguration(); + crunchSplit.setConf(conf); + } + if (crunchSplit.getInputSplit() instanceof CombineFileSplit) { + combineFileSplit = (CombineFileSplit) crunchSplit.getInputSplit(); + } + if (curReader != null) { + curReader.initialize(getDelegateSplit(), + TaskAttemptContextFactory.create(conf, context.getTaskAttemptID())); } - InputSplit delegateSplit = crunchSplit.getInputSplit(); - delegate.initialize(delegateSplit, - TaskAttemptContextFactory.create(conf, context.getTaskAttemptID())); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { - return delegate.nextKeyValue(); + while ((curReader == null) || !curReader.nextKeyValue()) { + if (!initNextRecordReader()) { + return false; + } + if (curReader != null) { + curReader.initialize(getDelegateSplit(), + TaskAttemptContextFactory.create(crunchSplit.getConf(), context.getTaskAttemptID())); + } + } + return true; } - } http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java index 604c49c..8912897 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java @@ -22,14 +22,14 @@ package org.apache.crunch.impl.mr.run; */ public class RuntimeParameters { - public static final String AGGREGATOR_BUCKETS = "crunch.aggregator.buckets"; - public static final String DEBUG = "crunch.debug"; public static final String TMP_DIR = "crunch.tmp.dir"; public static final String LOG_JOB_PROGRESS = "crunch.log.job.progress"; + public static final String DISABLE_COMBINE_FILE = "crunch.disable.combine.file"; + public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist"; // Not instantiated http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java index 13645ba..a3cbdc8 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java @@ -26,6 +26,7 @@ import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.Source; +import org.apache.crunch.impl.mr.run.CrunchInputFormat; import org.apache.crunch.io.CompositePathIterable; import org.apache.crunch.io.CrunchInputs; import org.apache.crunch.io.FileReaderFactory; @@ -91,19 +92,17 @@ public class FileSourceImpl<T> implements Source<T> { @Override public void configureSource(Job job, int inputId) throws IOException { - if (inputId == -1) { - for (Path path : paths) { - FileInputFormat.addInputPath(job, path); - } - job.setInputFormatClass(inputBundle.getFormatClass()); - inputBundle.configure(job.getConfiguration()); - } else { - for (Path path : paths) { - CrunchInputs.addInputPath(job, path, inputBundle, inputId); - } + // Use Crunch to handle the combined input splits + job.setInputFormatClass(CrunchInputFormat.class); + for (Path path : paths) { + CrunchInputs.addInputPath(job, path, inputBundle, inputId); } } + public FormatBundle<? extends InputFormat> getBundle() { + return inputBundle; + } + @Override public PType<T> getType() { return ptype; http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java index abef771..0756b70 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java @@ -20,6 +20,8 @@ package org.apache.crunch.io.text; import java.io.IOException; import java.util.List; + +import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.impl.FileSourceImpl; @@ -39,6 +41,7 @@ public class NLineFileSource<T> extends FileSourceImpl<T> implements ReadableSou private static FormatBundle getBundle(int linesPerTask) { FormatBundle bundle = FormatBundle.forInput(NLineInputFormat.class); bundle.set(NLineInputFormat.LINES_PER_MAP, String.valueOf(linesPerTask)); + bundle.set(RuntimeParameters.DISABLE_COMBINE_FILE, "true"); return bundle; } http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java b/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java index ceef2b2..a8ee398 100644 --- a/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java @@ -26,6 +26,8 @@ import java.io.IOException; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.mapred.AvroJob; +import org.apache.crunch.io.CrunchInputs; +import org.apache.crunch.io.FormatBundle; import org.apache.crunch.test.Person; import org.apache.crunch.test.StringWrapper; import org.apache.crunch.types.avro.AvroType; @@ -58,7 +60,8 @@ public class AvroFileSourceTest { AvroFileSource<Person> personFileSource = new AvroFileSource<Person>(new Path(tempFile.getAbsolutePath()), avroSpecificType); - personFileSource.configureSource(job, -1); + FormatBundle bundle = personFileSource.getBundle(); + bundle.configure(job.getConfiguration()); assertFalse(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true)); assertEquals(Person.SCHEMA$.toString(), job.getConfiguration().get(AvroJob.INPUT_SCHEMA)); @@ -70,7 +73,8 @@ public class AvroFileSourceTest { AvroFileSource<Record> personFileSource = new AvroFileSource<Record>(new Path(tempFile.getAbsolutePath()), avroGenericType); - personFileSource.configureSource(job, -1); + FormatBundle bundle = personFileSource.getBundle(); + bundle.configure(job.getConfiguration()); assertFalse(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true)); @@ -82,8 +86,8 @@ public class AvroFileSourceTest { AvroFileSource<StringWrapper> personFileSource = new AvroFileSource<StringWrapper>(new Path( tempFile.getAbsolutePath()), avroReflectType); - personFileSource.configureSource(job, -1); - + FormatBundle bundle = personFileSource.getBundle(); + bundle.configure(job.getConfiguration()); assertTrue(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, false)); }
