Repository: crunch Updated Branches: refs/heads/master a62a24b6f -> dc03c9133
CRUNCH-354: Have CrunchInputSplit implement Supplier<InputSplit> to provide access to the base split Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/dc03c913 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/dc03c913 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/dc03c913 Branch: refs/heads/master Commit: dc03c913318d2853eed3751886fe45bdfdf7dfa4 Parents: a62a24b Author: Josh Wills <[email protected]> Authored: Tue May 27 14:04:24 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Wed May 28 20:14:39 2014 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/impl/mr/run/CrunchInputSplit.java | 12 +++++++----- .../apache/crunch/impl/mr/run/CrunchRecordReader.java | 10 +++++----- 2 files changed, 12 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/dc03c913/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java index 02942bc..ec255d9 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java @@ -23,6 +23,7 @@ import java.io.DataOutput; import java.io.IOException; import java.io.OutputStream; +import com.google.common.base.Supplier; import org.apache.crunch.io.FormatBundle; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -35,7 +36,7 @@ import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.util.ReflectionUtils; -class CrunchInputSplit extends InputSplit implements Writable, Configurable { +class CrunchInputSplit extends InputSplit implements Writable, Configurable, Supplier<InputSplit> { private InputSplit inputSplit; private int nodeIndex; @@ -58,6 +59,11 @@ class CrunchInputSplit extends InputSplit implements Writable, Configurable { } @Override + public InputSplit get() { + return inputSplit; + } + + @Override public void setConf(Configuration conf) { this.conf = new Configuration(conf); if (bundle != null && conf != null) { @@ -74,10 +80,6 @@ class CrunchInputSplit extends InputSplit implements Writable, Configurable { return nodeIndex; } - public InputSplit getInputSplit() { - return inputSplit; - } - public Class<? extends InputFormat<?, ?>> getInputFormatClass() { return bundle.getFormatClass(); } http://git-wip-us.apache.org/repos/asf/crunch/blob/dc03c913/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java index 32b3f74..e475f10 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java @@ -41,8 +41,8 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> { public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context) throws IOException, InterruptedException { this.crunchSplit = (CrunchInputSplit) inputSplit; - if (crunchSplit.getInputSplit() instanceof CombineFileSplit) { - combineFileSplit = (CombineFileSplit) crunchSplit.getInputSplit(); + if (crunchSplit.get() instanceof CombineFileSplit) { + combineFileSplit = (CombineFileSplit) crunchSplit.get(); } this.context = context; Configuration conf = crunchSplit.getConf(); @@ -87,7 +87,7 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> { combineFileSplit.getLength(idx - 1), combineFileSplit.getLocations()); } else { - return crunchSplit.getInputSplit(); + return crunchSplit.get(); } } @@ -143,8 +143,8 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> { conf = context.getConfiguration(); crunchSplit.setConf(conf); } - if (crunchSplit.getInputSplit() instanceof CombineFileSplit) { - combineFileSplit = (CombineFileSplit) crunchSplit.getInputSplit(); + if (crunchSplit.get() instanceof CombineFileSplit) { + combineFileSplit = (CombineFileSplit) crunchSplit.get(); } if (curReader != null) { curReader.initialize(getDelegateSplit(),
