Updated Branches: refs/heads/TEZ-1 b102eb1c6 -> 18f0ebd8e
TEZ-103. Configured Combiner is not being used. Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/18f0ebd8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/18f0ebd8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/18f0ebd8 Branch: refs/heads/TEZ-1 Commit: 18f0ebd8e562a01fce6e39d7a0977ce2f8b9d3a1 Parents: b102eb1 Author: Hitesh Shah <[email protected]> Authored: Mon May 6 18:16:20 2013 -0700 Committer: Hitesh Shah <[email protected]> Committed: Tue May 7 13:38:41 2013 -0700 ---------------------------------------------------------------------- .../tez/engine/common/combine/CombineInput.java | 6 +- .../engine/common/shuffle/impl/MergeManager.java | 55 ++- .../engine/common/sort/impl/ExternalSorter.java | 15 +- .../apache/tez/engine/common/sort/impl/IFile.java | 5 + .../engine/common/sort/impl/IFileOutputStream.java | 3 + .../common/sort/impl/dflt/DefaultSorter.java | 12 +- .../task/local/output/TezTaskOutputFiles.java | 11 +- .../apache/tez/mapreduce/combine/MRCombiner.java | 348 +++++++++++++++ .../org/apache/tez/mapreduce/processor/MRTask.java | 3 + 9 files changed, 423 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java index 707e54c..bf504bb 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java @@ -121,7 +121,11 @@ public class CombineInput implements Input { public void close() throws IOException { input.close(); } - + + public TezRawKeyValueIterator getIterator() { + return this.input; + } + protected class ValueIterator implements Iterator<Object> { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java index 55860ea..9156f28 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java @@ -373,12 +373,17 @@ public class MergeManager { CombineOutput combineOut = new CombineOutput(writer); combineOut.initialize(conf, reporter); - - combineProcessor.process(new Input[] {combineIn}, - new Output[] {combineOut}); - - combineIn.close(); - combineOut.close(); + + try { + combineProcessor.process(new Input[] {combineIn}, + new Output[] {combineOut}); + } catch (IOException ioe) { + try { + combineProcessor.close(); + } catch (IOException ignoredException) {} + + throw ioe; + } } @@ -471,31 +476,33 @@ public class MergeManager { mergeOutputSize).suffix( Constants.MERGED_OUTPUT_PREFIX); - Writer writer = - new Writer(conf, rfs, outputPath, - (Class)ConfigUtils.getIntermediateInputKeyClass(conf), - (Class)ConfigUtils.getIntermediateInputValueClass(conf), - codec, null); - - TezRawKeyValueIterator rIter = null; + Writer writer = null; try { + writer = + new Writer(conf, rfs, outputPath, + (Class)ConfigUtils.getIntermediateInputKeyClass(conf), + (Class)ConfigUtils.getIntermediateInputValueClass(conf), + codec, null); + + TezRawKeyValueIterator rIter = null; LOG.info("Initiating in-memory merge with " + noInMemorySegments + - " segments..."); - + " segments..."); + rIter = TezMerger.merge(conf, rfs, - (Class)ConfigUtils.getIntermediateInputKeyClass(conf), - (Class)ConfigUtils.getIntermediateInputValueClass(conf), - inMemorySegments, inMemorySegments.size(), - new Path(taskAttemptId.toString()), - (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf), - reporter, spilledRecordsCounter, null, null); - + (Class)ConfigUtils.getIntermediateInputKeyClass(conf), + (Class)ConfigUtils.getIntermediateInputValueClass(conf), + inMemorySegments, inMemorySegments.size(), + new Path(taskAttemptId.toString()), + (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf), + reporter, spilledRecordsCounter, null, null); + if (null == combineProcessor) { TezMerger.writeFile(rIter, writer, reporter, conf); } else { runCombineProcessor(rIter, writer); } writer.close(); + writer = null; LOG.info(taskAttemptId + " Merge of the " + noInMemorySegments + @@ -507,6 +514,10 @@ public class MergeManager { //earlier when we invoked cloneFileAttributes localFS.delete(outputPath, true); throw e; + } finally { + if (writer != null) { + writer.close(); + } } // Note the output of the merge http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java index 6d07a1c..f6af426 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java @@ -193,11 +193,18 @@ public abstract class ExternalSorter { CombineOutput combineOut = new CombineOutput(writer); combineOut.initialize(job, runningTaskContext.getTaskReporter()); - combineProcessor.process(new Input[] {combineIn}, - new Output[] {combineOut}); + try { + combineProcessor.process(new Input[] {combineIn}, + new Output[] {combineOut}); + } catch (IOException ioe) { + try { + combineProcessor.close(); + } catch (IOException ignored) {} - combineIn.close(); - combineOut.close(); + // Do not close output here as the sorter should close the combine output + + throw ioe; + } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java index 161cc5a..db59a13 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java @@ -23,6 +23,7 @@ import java.io.DataOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -73,6 +74,7 @@ public class IFile { boolean ownOutputStream = false; long start = 0; FSDataOutputStream rawOut; + AtomicBoolean closed = new AtomicBoolean(false); CompressionOutputStream compressedOut; Compressor compressor; @@ -153,6 +155,9 @@ public class IFile { } public void close() throws IOException { + if (closed.getAndSet(true)) { + throw new IOException("Writer was already closed earlier"); + } // When IFile writer is created by BackupStore, we do not have // Key and Value classes set. So, check before closing the http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java index 45cf917..75fcd68 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java @@ -101,12 +101,15 @@ public class IFileOutputStream extends FilterOutputStream { sum.update(buffer, 0, offset); offset = 0; } + /* + // FIXME if needed re-enable this in debug mode if (LOG.isDebugEnabled()) { LOG.debug("XXX checksum" + " b=" + b + " off=" + off + " buffer=" + " offset=" + offset + " len=" + len); } + */ /* now we should have len < buffer.length */ System.arraycopy(b, off, buffer, offset, len); offset += len; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java index 75c7de3..0c35760 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java @@ -673,6 +673,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { spillLock.unlock(); sortAndSpill(); } catch (Throwable t) { + LOG.warn("ZZZZ: Got an exception in sortAndSpill", t); sortSpillException = t; } finally { spillLock.lock(); @@ -794,6 +795,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { if (spstart != spindex) { TezRawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); + LOG.info("DEBUG: Running combine processor"); runCombineProcessor(kvIter, writer); } } @@ -1052,7 +1054,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { sortPhase.complete(); return; } - { + else { sortPhase.addPhases(partitions); // Divide sort phase into sub-phases TezMerger.considerFinalMergeForProgress(); @@ -1096,16 +1098,16 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { long segmentStart = finalOut.getPos(); Writer writer = new Writer(job, finalOut, keyClass, valClass, codec, - spilledRecordsCounter); + spilledRecordsCounter); if (combineProcessor == null || numSpills < minSpillsForCombine) { - TezMerger.writeFile(kvIter, writer, runningTaskContext.getTaskReporter(), job); - writer.close(); + TezMerger.writeFile(kvIter, writer, + runningTaskContext.getTaskReporter(), job); } else { runCombineProcessor(kvIter, writer); } + writer.close(); sortPhase.startNextPhase(); - // record offsets final TezIndexRecord rec = new TezIndexRecord( http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java index c925367..eeca130 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java @@ -20,6 +20,8 @@ package org.apache.tez.engine.common.task.local.output; import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -42,6 +44,7 @@ import org.apache.tez.engine.records.TezTaskID; @InterfaceStability.Unstable public class TezTaskOutputFiles extends TezTaskOutput { + private static final Log LOG = LogFactory.getLog(TezTaskOutputFiles.class); private Configuration conf; private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out"; @@ -56,9 +59,11 @@ public class TezTaskOutputFiles extends TezTaskOutput { new LocalDirAllocator(TezJobConfig.LOCAL_DIR); private Path getAttemptOutputDir() { - System.err.println("getAttemptOutputDir: " + - Constants.TASK_OUTPUT_DIR + "/" + conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID)); - return new Path(Constants.TASK_OUTPUT_DIR, conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID)); + LOG.info("DEBUG: getAttemptOutputDir: " + + Constants.TASK_OUTPUT_DIR + "/" + + conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID)); + return new Path(Constants.TASK_OUTPUT_DIR, + conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID)); } /** http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java new file mode 100644 index 0000000..42eddee --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java @@ -0,0 +1,348 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.combine; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RawKeyValueIterator; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer; +import org.apache.hadoop.mapreduce.task.ReduceContextImpl; +import org.apache.hadoop.util.Progress; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tez.common.TezTaskUmbilicalProtocol; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.engine.api.Input; +import org.apache.tez.engine.api.Master; +import org.apache.tez.engine.api.Output; +import org.apache.tez.engine.api.Processor; +import org.apache.tez.engine.common.ConfigUtils; +import org.apache.tez.engine.common.combine.CombineInput; +import org.apache.tez.engine.common.combine.CombineOutput; +import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator; +import org.apache.tez.engine.records.TezTaskAttemptID; +import org.apache.tez.mapreduce.hadoop.IDConverter; +import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.tez.mapreduce.processor.MRTask; +import org.apache.tez.mapreduce.processor.MRTaskReporter; + +public class MRCombiner implements Processor { + + private static Log LOG = LogFactory.getLog(MRCombiner.class); + + JobConf jobConf; + boolean useNewApi; + + private final MRTask task; + + private Counter combinerInputKeyCounter; + private Counter combinerInputValueCounter; + private Progress combinePhase; + + public MRCombiner(MRTask task) { + this.task = task; + } + + @Override + public void initialize(Configuration conf, Master master) throws IOException, + InterruptedException { + if (conf instanceof JobConf) { + jobConf = (JobConf)conf; + } else { + jobConf = new JobConf(conf); + } + useNewApi = jobConf.getUseNewMapper(); + } + + @Override + public void process(Input[] in, Output[] out) throws IOException, + InterruptedException { + LOG.info("DEBUG: Running MRCombiner" + + ", usingNewAPI=" + useNewApi); + + CombineInput input = (CombineInput)in[0]; + CombineOutput output = (CombineOutput)out[0]; + + combinePhase = task.getProgress().addPhase("combine"); + + Class<?> keyClass = ConfigUtils.getIntermediateOutputKeyClass(jobConf); + Class<?> valueClass = ConfigUtils.getIntermediateOutputValueClass(jobConf); + LOG.info("Using combineKeyClass: " + keyClass); + LOG.info("Using combineValueClass: " + valueClass); + RawComparator<?> comparator = + ConfigUtils.getIntermediateOutputKeyComparator(jobConf); + LOG.info("Using combineComparator: " + comparator); + + combinerInputKeyCounter = + task.getMRReporter().getCounter(TaskCounter.COMBINE_INPUT_RECORDS); + combinerInputValueCounter = + task.getMRReporter().getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); + + if (useNewApi) { + try { + runNewCombiner(this.jobConf, + task.getUmbilical(), + task.getMRReporter(), + input, comparator, keyClass, valueClass, output); + } catch (ClassNotFoundException cnfe) { + throw new IOException(cnfe); + } + } else { + runOldCombiner(this.jobConf, + task.getUmbilical(), + task.getMRReporter(), + input, + comparator, keyClass, valueClass, + output); + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void runOldCombiner(JobConf job, + TezTaskUmbilicalProtocol umbilical, + final MRTaskReporter reporter, + CombineInput input, + RawComparator comparator, + Class keyClass, + Class valueClass, + final Output output) throws IOException, InterruptedException { + + Reducer combiner = + ReflectionUtils.newInstance(job.getCombinerClass(), job); + + // make output collector + + OutputCollector collector = + new OutputCollector() { + public void collect(Object key, Object value) + throws IOException { + try { + output.write(key, value); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + } + }; + + // apply combiner function + CombinerValuesIterator values = + new CombinerValuesIterator(input, + comparator, keyClass, valueClass, job, reporter, + combinerInputValueCounter, combinePhase); + + values.informReduceProgress(); + while (values.more()) { + combinerInputKeyCounter.increment(1); + combiner.reduce(values.getKey(), values, collector, reporter); + values.nextKey(); + values.informReduceProgress(); + } + } + + private static final class CombinerValuesIterator<KEY,VALUE> + extends org.apache.tez.engine.common.task.impl.ValuesIterator<KEY,VALUE> { + private Counter combineInputValueCounter; + private Progress combinePhase; + + public CombinerValuesIterator (CombineInput in, + RawComparator<KEY> comparator, + Class<KEY> keyClass, + Class<VALUE> valClass, + Configuration conf, Progressable reporter, + Counter combineInputValueCounter, + Progress combinePhase) + throws IOException { + super(in.getIterator(), comparator, keyClass, valClass, conf, reporter); + this.combineInputValueCounter = combineInputValueCounter; + this.combinePhase = combinePhase; + } + + @Override + public VALUE next() { + combineInputValueCounter.increment(1); + return moveToNext(); + } + + protected VALUE moveToNext() { + return super.next(); + } + + public void informReduceProgress() { + combinePhase.set(super.in.getProgress().getProgress()); // update progress + reporter.progress(); + } + } + + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void runNewCombiner(JobConf job, + final TezTaskUmbilicalProtocol umbilical, + final MRTaskReporter reporter, + CombineInput input, + RawComparator comparator, + Class keyClass, + Class valueClass, + final Output out + ) throws IOException,InterruptedException, + ClassNotFoundException { + // wrap value iterator to report progress. + final TezRawKeyValueIterator rawIter = input.getIterator(); + TezRawKeyValueIterator rIter = new TezRawKeyValueIterator() { + public void close() throws IOException { + rawIter.close(); + } + public DataInputBuffer getKey() throws IOException { + return rawIter.getKey(); + } + public Progress getProgress() { + return rawIter.getProgress(); + } + public DataInputBuffer getValue() throws IOException { + return rawIter.getValue(); + } + public boolean next() throws IOException { + boolean ret = rawIter.next(); + // FIXME progress updates for combiner + // reporter.setProgress(rawIter.getProgress().getProgress()); + return ret; + } + }; + + // make a task context so we can get the classes + org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = + new TaskAttemptContextImpl(job, task.getTaskAttemptId(), reporter); + + // make a reducer + org.apache.hadoop.mapreduce.Reducer reducer = + (org.apache.hadoop.mapreduce.Reducer) + ReflectionUtils.newInstance(taskContext.getReducerClass(), job); + + org.apache.hadoop.mapreduce.RecordWriter trackedRW = + new org.apache.hadoop.mapreduce.RecordWriter() { + + @Override + public void write(Object key, Object value) throws IOException, + InterruptedException { + out.write(key, value); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + // Should not close this here as the sorter will close the + // combine output + } + }; + + org.apache.hadoop.mapreduce.Reducer.Context reducerContext = + createReduceContext( + reducer, job, task.getTaskAttemptId(), + rIter, combinerInputKeyCounter, + combinerInputValueCounter, + trackedRW, + null, + reporter, comparator, keyClass, + valueClass); + reducer.run(reducerContext); + trackedRW.close(reducerContext); + } + + @Override + public void close() throws IOException, InterruptedException { + } + + protected static <INKEY,INVALUE,OUTKEY,OUTVALUE> + org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context + createReduceContext(org.apache.hadoop.mapreduce.Reducer + <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer, + Configuration job, + TezTaskAttemptID taskId, + final TezRawKeyValueIterator rIter, + org.apache.hadoop.mapreduce.Counter inputKeyCounter, + org.apache.hadoop.mapreduce.Counter inputValueCounter, + org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output, + org.apache.hadoop.mapreduce.OutputCommitter committer, + org.apache.hadoop.mapreduce.StatusReporter reporter, + RawComparator<INKEY> comparator, + Class<INKEY> keyClass, Class<INVALUE> valueClass + ) throws IOException, InterruptedException { + RawKeyValueIterator r = + new RawKeyValueIterator() { + + @Override + public boolean next() throws IOException { + return rIter.next(); + } + + @Override + public DataInputBuffer getValue() throws IOException { + return rIter.getValue(); + } + + @Override + public Progress getProgress() { + return rIter.getProgress(); + } + + @Override + public DataInputBuffer getKey() throws IOException { + return rIter.getKey(); + } + + @Override + public void close() throws IOException { + rIter.close(); + } + }; + org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> + reduceContext = + new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>( + job, + IDConverter.toMRTaskAttemptId(taskId), + r, + inputKeyCounter, + inputValueCounter, + output, + committer, + reporter, + comparator, + keyClass, + valueClass); + LOG.info("DEBUG: Using combineKeyClass: " + + keyClass + ", combineValueClass: " + valueClass); + + org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context + reducerContext = new + WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext( + reduceContext); + + return reducerContext; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java index 17fab1b..d17e477 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java @@ -68,6 +68,7 @@ import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator; import org.apache.tez.engine.records.OutputContext; import org.apache.tez.engine.records.TezDAGID; import org.apache.tez.engine.records.TezTaskAttemptID; +import org.apache.tez.mapreduce.combine.MRCombiner; import org.apache.tez.mapreduce.hadoop.IDConverter; import org.apache.tez.mapreduce.hadoop.MRConfig; import org.apache.tez.mapreduce.hadoop.MRTaskStatus; @@ -216,6 +217,8 @@ extends RunningTaskContext { partitioner = new MRPartitioner(this); ((MRPartitioner)partitioner).initialize(job, getTaskReporter()); + combineProcessor = new MRCombiner(this); + combineProcessor.initialize(job, getTaskReporter()); localizeConfiguration(jobConf); }
