IGNITE-4507: Hadoop: added direct output support for combiner. This closes #1434.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/476b089b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/476b089b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/476b089b Branch: refs/heads/ignite-1.9 Commit: 476b089b1dd4b4c5d3b6ae21e1b3b2c010c086ac Parents: b6005b0 Author: tledkov-gridgain <[email protected]> Authored: Fri Jan 20 17:33:34 2017 +0300 Committer: devozerov <[email protected]> Committed: Fri Jan 20 17:35:39 2017 +0300 ---------------------------------------------------------------------- .../processors/hadoop/HadoopTaskContext.java | 10 +++ .../hadoop/impl/v1/HadoopV1MapTask.java | 89 +++++++++++--------- .../hadoop/impl/v1/HadoopV1ReduceTask.java | 69 +++++++++------ .../hadoop/impl/v2/HadoopV2Context.java | 10 --- .../hadoop/impl/v2/HadoopV2MapTask.java | 18 ++-- .../hadoop/impl/v2/HadoopV2ReduceTask.java | 14 +++ .../hadoop/impl/v2/HadoopV2TaskContext.java | 1 + .../hadoop/shuffle/HadoopShuffleJob.java | 7 -- .../shuffle/direct/HadoopDirectDataInput.java | 2 +- .../hadoop/taskexecutor/HadoopRunnableTask.java | 12 ++- .../impl/HadoopAbstractMapReduceTest.java | 2 + .../impl/HadoopMapReduceEmbeddedSelfTest.java | 6 +- 12 files changed, 145 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java index dddd017..d6e9394 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java @@ -207,4 +207,14 @@ public abstract class HadoopTaskContext { * @throws IgniteCheckedException On any error in callable. */ public abstract <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException; + + /** + * Callback invoked from mapper thread when map is finished. + * + * @throws IgniteCheckedException If failed. + */ + public void onMapperFinished() throws IgniteCheckedException { + if (output instanceof HadoopMapperAwareTaskOutput) + ((HadoopMapperAwareTaskOutput)output).onMapperFinished(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java index 65ff280..2aa4292 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java @@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils; import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; @@ -45,7 +46,7 @@ public class HadoopV1MapTask extends HadoopV1Task { /** * Constructor. * - * @param taskInfo + * @param taskInfo Taks info. */ public HadoopV1MapTask(HadoopTaskInfo taskInfo) { super(taskInfo); @@ -56,67 +57,79 @@ public class HadoopV1MapTask extends HadoopV1Task { @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { HadoopJob job = taskCtx.job(); - HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx; - JobConf jobConf = ctx.jobConf(); + if (taskCtx.taskInfo().hasMapperIndex()) + HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex()); + else + HadoopMapperUtils.clearMapperIndex(); - InputFormat inFormat = jobConf.getInputFormat(); + try { + JobConf jobConf = taskCtx0.jobConf(); - HadoopInputSplit split = info().inputSplit(); + InputFormat inFormat = jobConf.getInputFormat(); - InputSplit nativeSplit; + HadoopInputSplit split = info().inputSplit(); - if (split instanceof HadoopFileBlock) { - HadoopFileBlock block = (HadoopFileBlock)split; + InputSplit nativeSplit; - nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS); - } - else - nativeSplit = (InputSplit)ctx.getNativeSplit(split); + if (split instanceof HadoopFileBlock) { + HadoopFileBlock block = (HadoopFileBlock)split; - assert nativeSplit != null; + nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS); + } + else + nativeSplit = (InputSplit)taskCtx0.getNativeSplit(split); - Reporter reporter = new HadoopV1Reporter(taskCtx); + assert nativeSplit != null; - HadoopV1OutputCollector collector = null; + Reporter reporter = new HadoopV1Reporter(taskCtx); - try { - collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(), - fileName(), ctx.attemptId()); + HadoopV1OutputCollector collector = null; - RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter); + try { + collector = collector(jobConf, taskCtx0, !job.info().hasCombiner() && !job.info().hasReducer(), + fileName(), taskCtx0.attemptId()); - Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf); + RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter); - Object key = reader.createKey(); - Object val = reader.createValue(); + Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf); - assert mapper != null; + Object key = reader.createKey(); + Object val = reader.createValue(); + + assert mapper != null; - try { try { - while (reader.next(key, val)) { - if (isCancelled()) - throw new HadoopTaskCancelledException("Map task cancelled."); + try { + while (reader.next(key, val)) { + if (isCancelled()) + throw new HadoopTaskCancelledException("Map task cancelled."); + + mapper.map(key, val, collector, reporter); + } - mapper.map(key, val, collector, reporter); + taskCtx.onMapperFinished(); + } + finally { + mapper.close(); } } finally { - mapper.close(); + collector.closeWriter(); } + + collector.commit(); } - finally { - collector.closeWriter(); - } + catch (Exception e) { + if (collector != null) + collector.abort(); - collector.commit(); + throw new IgniteCheckedException(e); + } } - catch (Exception e) { - if (collector != null) - collector.abort(); - - throw new IgniteCheckedException(e); + finally { + HadoopMapperUtils.clearMapperIndex(); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java index 92c024e..5c1dd15 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java @@ -23,6 +23,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.ReflectionUtils; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils; import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; @@ -53,49 +54,63 @@ public class HadoopV1ReduceTask extends HadoopV1Task { @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { HadoopJob job = taskCtx.job(); - HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx; - JobConf jobConf = ctx.jobConf(); - - HadoopTaskInput input = taskCtx.input(); - - HadoopV1OutputCollector collector = null; + if (!reduce && taskCtx.taskInfo().hasMapperIndex()) + HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex()); + else + HadoopMapperUtils.clearMapperIndex(); try { - collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId()); + JobConf jobConf = taskCtx0.jobConf(); - Reducer reducer; - if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(), - jobConf); - else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(), - jobConf); + HadoopTaskInput input = taskCtx.input(); - assert reducer != null; + HadoopV1OutputCollector collector = null; try { + collector = collector(jobConf, taskCtx0, reduce || !job.info().hasReducer(), fileName(), taskCtx0.attemptId()); + + Reducer reducer; + if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(), + jobConf); + else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(), + jobConf); + + assert reducer != null; + try { - while (input.next()) { - if (isCancelled()) - throw new HadoopTaskCancelledException("Reduce task cancelled."); + try { + while (input.next()) { + if (isCancelled()) + throw new HadoopTaskCancelledException("Reduce task cancelled."); + + reducer.reduce(input.key(), input.values(), collector, Reporter.NULL); + } - reducer.reduce(input.key(), input.values(), collector, Reporter.NULL); + if (!reduce) + taskCtx.onMapperFinished(); + } + finally { + reducer.close(); } } finally { - reducer.close(); + collector.closeWriter(); } + + collector.commit(); } - finally { - collector.closeWriter(); - } + catch (Exception e) { + if (collector != null) + collector.abort(); - collector.commit(); + throw new IgniteCheckedException(e); + } } - catch (Exception e) { - if (collector != null) - collector.abort(); - - throw new IgniteCheckedException(e); + finally { + if (!reduce) + HadoopMapperUtils.clearMapperIndex(); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java index eec0636..1f4e675 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java @@ -154,16 +154,6 @@ public class HadoopV2Context extends JobContextImpl implements MapContext, Reduc } } - /** - * Callback invoked from mapper thread when map is finished. - * - * @throws IgniteCheckedException If failed. - */ - public void onMapperFinished() throws IgniteCheckedException { - if (output instanceof HadoopMapperAwareTaskOutput) - ((HadoopMapperAwareTaskOutput)output).onMapperFinished(); - } - /** {@inheritDoc} */ @Override public OutputCommitter getOutputCommitter() { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java index eb3b935..1519199 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java @@ -56,30 +56,32 @@ public class HadoopV2MapTask extends HadoopV2Task { HadoopMapperUtils.clearMapperIndex(); try { - InputSplit nativeSplit = hadoopContext().getInputSplit(); + HadoopV2Context hadoopCtx = hadoopContext(); + + InputSplit nativeSplit = hadoopCtx.getInputSplit(); if (nativeSplit == null) throw new IgniteCheckedException("Input split cannot be null."); InputFormat inFormat = ReflectionUtils.newInstance(jobCtx.getInputFormatClass(), - hadoopContext().getConfiguration()); + hadoopCtx.getConfiguration()); - RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopContext()); + RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopCtx); - reader.initialize(nativeSplit, hadoopContext()); + reader.initialize(nativeSplit, hadoopCtx); - hadoopContext().reader(reader); + hadoopCtx.reader(reader); HadoopJobInfo jobInfo = taskCtx.job().info(); outputFormat = jobInfo.hasCombiner() || jobInfo.hasReducer() ? null : prepareWriter(jobCtx); - Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopContext().getConfiguration()); + Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopCtx.getConfiguration()); try { - mapper.run(new WrappedMapper().getMapContext(hadoopContext())); + mapper.run(new WrappedMapper().getMapContext(hadoopCtx)); - hadoopContext().onMapperFinished(); + taskCtx.onMapperFinished(); } finally { closeWriter(); http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java index 930ec1d..09e0634 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java @@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer; import org.apache.hadoop.util.ReflectionUtils; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; /** @@ -53,10 +54,17 @@ public class HadoopV2ReduceTask extends HadoopV2Task { JobContextImpl jobCtx = taskCtx.jobContext(); + // Set mapper index for combiner tasks + if (!reduce && taskCtx.taskInfo().hasMapperIndex()) + HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex()); + else + HadoopMapperUtils.clearMapperIndex(); + try { outputFormat = reduce || !taskCtx.job().info().hasReducer() ? prepareWriter(jobCtx) : null; Reducer reducer; + if (reduce) reducer = ReflectionUtils.newInstance(jobCtx.getReducerClass(), jobCtx.getConfiguration()); else reducer = ReflectionUtils.newInstance(jobCtx.getCombinerClass(), @@ -64,6 +72,9 @@ public class HadoopV2ReduceTask extends HadoopV2Task { try { reducer.run(new WrappedReducer().getReducerContext(hadoopContext())); + + if (!reduce) + taskCtx.onMapperFinished(); } finally { closeWriter(); @@ -84,6 +95,9 @@ public class HadoopV2ReduceTask extends HadoopV2Task { throw new IgniteCheckedException(e); } finally { + if (!reduce) + HadoopMapperUtils.clearMapperIndex(); + if (err != null) abort(outputFormat); } http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java index d328550..475e43d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; import org.apache.ignite.internal.processors.hadoop.HadoopJob; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty; +import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput; import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner; import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper; http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java index 318ead3..4bcc398 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java @@ -182,13 +182,6 @@ public class HadoopShuffleJob<T> implements AutoCloseable { boolean stripeMappers0 = get(job.info(), SHUFFLE_MAPPER_STRIPED_OUTPUT, true); if (stripeMappers0) { - if (job.info().hasCombiner()) { - log.info("Striped mapper output is disabled because it cannot be used together with combiner [jobId=" + - job.id() + ']'); - - stripeMappers0 = false; - } - if (!embedded) { log.info("Striped mapper output is disabled becuase it cannot be used in external mode [jobId=" + job.id() + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java index e3a713a..ef2905b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java @@ -48,7 +48,7 @@ public class HadoopDirectDataInput extends InputStream implements DataInput { /** {@inheritDoc} */ @Override public int read() throws IOException { - return readByte(); + return (int)readByte() & 0xFF; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java index a57efe6..339bf5b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java @@ -122,7 +122,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> { /** * Implements actual task running. - * @throws IgniteCheckedException + * @throws IgniteCheckedException On error. */ void call0() throws IgniteCheckedException { execStartTs = U.currentTimeMillis(); @@ -144,7 +144,15 @@ public abstract class HadoopRunnableTask implements Callable<Void> { runTask(perfCntr); if (info.type() == MAP && job.info().hasCombiner()) { - ctx.taskInfo(new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null)); + // Switch to combiner. + HadoopTaskInfo combineTaskInfo = new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), + info.attempt(), null); + + // Mapper and combiner share the same index. + if (ctx.taskInfo().hasMapperIndex()) + combineTaskInfo.mapperIndex(ctx.taskInfo().mapperIndex()); + + ctx.taskInfo(combineTaskInfo); try { runTask(perfCntr); http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java index 89005f6..cd997a4 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java @@ -172,6 +172,8 @@ public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest { */ protected final void doTest(IgfsPath inFile, boolean useNewMapper, boolean useNewCombiner, boolean useNewReducer) throws Exception { + log.info("useNewMapper=" + useNewMapper + ", useNewCombiner=" + useNewCombiner + ", useNewReducer=" + useNewReducer); + igfs.delete(new IgfsPath(PATH_OUTPUT), true); JobConf jobConf = new JobConf(); http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java index 8897a38..bce67f6 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java @@ -55,14 +55,14 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest { return cfg; } - /* + /** * @throws Exception If fails. */ public void testMultiReducerWholeMapReduceExecution() throws Exception { checkMultiReducerWholeMapReduceExecution(false); } - /* + /** * @throws Exception If fails. */ public void testMultiReducerWholeMapReduceExecutionStriped() throws Exception { @@ -100,6 +100,8 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest { if (striped) jobConf.set(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(), "true"); + else + jobConf.set(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(), "false"); jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
