IGNITE-4270: Hadoop: implemented striped mapper output. This closes #1334.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a976c425 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a976c425 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a976c425 Branch: refs/heads/master Commit: a976c42590f786ff977999736be5436e7dac9d87 Parents: 58c3380 Author: devozerov <[email protected]> Authored: Fri Dec 9 12:01:40 2016 +0300 Committer: devozerov <[email protected]> Committed: Thu Dec 15 13:46:19 2016 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 6 + .../processors/hadoop/HadoopJobProperty.java | 7 + .../hadoop/HadoopMapperAwareTaskOutput.java | 32 ++ .../processors/hadoop/HadoopTaskInfo.java | 43 ++ .../shuffle/HadoopDirectShuffleMessage.java | 243 ++++++++++++ .../processors/hadoop/HadoopMapperUtils.java | 56 +++ .../hadoop/impl/v2/HadoopV2Context.java | 11 + .../hadoop/impl/v2/HadoopV2MapTask.java | 10 + .../hadoop/jobtracker/HadoopJobTracker.java | 4 + .../hadoop/shuffle/HadoopShuffle.java | 23 +- .../hadoop/shuffle/HadoopShuffleJob.java | 389 ++++++++++++++----- .../shuffle/HadoopShuffleRemoteState.java | 5 +- .../shuffle/direct/HadoopDirectDataInput.java | 166 ++++++++ .../shuffle/direct/HadoopDirectDataOutput.java | 221 +++++++++++ .../direct/HadoopDirectDataOutputContext.java | 100 +++++ .../direct/HadoopDirectDataOutputState.java | 54 +++ .../child/HadoopChildProcessRunner.java | 2 +- .../impl/HadoopMapReduceEmbeddedSelfTest.java | 22 +- 18 files changed, 1287 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 4ffb220..504e683 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -122,6 +122,7 @@ import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck; import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishRequest; import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishResponse; import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage; +import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage; import org.apache.ignite.internal.processors.igfs.IgfsAckMessage; import org.apache.ignite.internal.processors.igfs.IgfsBlockKey; import org.apache.ignite.internal.processors.igfs.IgfsBlocksMessage; @@ -170,6 +171,11 @@ public class GridIoMessageFactory implements MessageFactory { Message msg = null; switch (type) { + case -42: + msg = new HadoopDirectShuffleMessage(); + + break; + case -41: msg = new HadoopShuffleFinishResponse(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java index e713caa..1f0ef1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java @@ -91,6 +91,13 @@ public enum HadoopJobProperty { SHUFFLE_MSG_SIZE("ignite.shuffle.message.size"), /** + * Whether to stripe mapper output for remote reducers. + * <p> + * Defaults to {@code false}. + */ + SHUFFLE_MAPPER_STRIPED_OUTPUT("ignite.shuffle.mapper.striped.output"), + + /** * Shuffle job throttle in milliseconds. When job is executed with separate shuffle thread, this parameter * controls sleep duration between iterations through intermediate reducer maps. * <p> http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperAwareTaskOutput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperAwareTaskOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperAwareTaskOutput.java new file mode 100644 index 0000000..1d6637c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperAwareTaskOutput.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.IgniteCheckedException; + +/** + * Special output type with callback invoked when mapper finished writing data. + */ +public interface HadoopMapperAwareTaskOutput extends HadoopTaskOutput { + /** + * Callback invoked when mapper finished writing data. + * + * @throws IgniteCheckedException If failed. + */ + public void onMapperFinished() throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java index b76fb85..3509367 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java @@ -46,6 +46,12 @@ public class HadoopTaskInfo implements Externalizable { /** */ private HadoopInputSplit inputSplit; + /** Whether mapper index is set. */ + private boolean mapperIdxSet; + + /** Current mapper index. */ + private int mapperIdx; + /** * For {@link Externalizable}. */ @@ -78,6 +84,13 @@ public class HadoopTaskInfo implements Externalizable { out.writeInt(taskNum); out.writeInt(attempt); out.writeObject(inputSplit); + + if (mapperIdxSet) { + out.writeBoolean(true); + out.writeInt(mapperIdx); + } + else + out.writeBoolean(false); } /** {@inheritDoc} */ @@ -87,6 +100,13 @@ public class HadoopTaskInfo implements Externalizable { taskNum = in.readInt(); attempt = in.readInt(); inputSplit = (HadoopInputSplit)in.readObject(); + + if (in.readBoolean()) { + mapperIdxSet = true; + mapperIdx = in.readInt(); + } + else + mapperIdxSet = false; } /** @@ -118,6 +138,29 @@ public class HadoopTaskInfo implements Externalizable { } /** + * @param mapperIdx Current mapper index. + */ + public void mapperIndex(int mapperIdx) { + this.mapperIdx = mapperIdx; + + mapperIdxSet = true; + } + + /** + * @return Current mapper index or {@code null} + */ + public int mapperIndex() { + return mapperIdx; + } + + /** + * @return {@code True} if mapped index is set. + */ + public boolean hasMapperIndex() { + return mapperIdxSet; + } + + /** * @return Input split. */ @Nullable public HadoopInputSplit inputSplit() { http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java new file mode 100644 index 0000000..e81dc5f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle; + +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.nio.ByteBuffer; + +/** + * Direct shuffle message. + */ +public class HadoopDirectShuffleMessage implements Message, HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @GridToStringInclude + private HadoopJobId jobId; + + /** */ + @GridToStringInclude + private int reducer; + + /** Count. */ + private int cnt; + + /** Buffer. */ + private byte[] buf; + + /** Buffer length (equal or less than buf.length). */ + @GridDirectTransient + private transient int bufLen; + + /** + * Default constructor. + */ + public HadoopDirectShuffleMessage() { + // No-op. + } + + /** + * Constructor. + * + * @param jobId Job ID. + * @param reducer Reducer. + * @param cnt Count. + * @param buf Buffer. + * @param bufLen Buffer length. + */ + public HadoopDirectShuffleMessage(HadoopJobId jobId, int reducer, int cnt, byte[] buf, int bufLen) { + assert jobId != null; + + this.jobId = jobId; + this.reducer = reducer; + this.cnt = cnt; + this.buf = buf; + this.bufLen = bufLen; + } + + /** + * @return Job ID. + */ + public HadoopJobId jobId() { + return jobId; + } + + /** + * @return Reducer. + */ + public int reducer() { + return reducer; + } + + /** + * @return Count. + */ + public int count() { + return cnt; + } + + /** + * @return Buffer. + */ + public byte[] buffer() { + return buf; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMessage("jobId", jobId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeInt("reducer", reducer)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeInt("cnt", cnt)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeByteArray("buf", this.buf, 0, bufLen)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + jobId = reader.readMessage("jobId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + reducer = reader.readInt("reducer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + cnt = reader.readInt("cnt"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + this.buf = reader.readByteArray("buf"); + + if (!reader.isLastRead()) + return false; + + bufLen = this.buf != null ? this.buf.length : 0; + + reader.incrementState(); + + } + + return reader.afterMessageRead(HadoopDirectShuffleMessage.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -42; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + jobId.writeExternal(out); + + out.writeInt(reducer); + out.writeInt(cnt); + + U.writeByteArray(out, buf); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = new HadoopJobId(); + jobId.readExternal(in); + + reducer = in.readInt(); + cnt = in.readInt(); + + buf = U.readByteArray(in); + bufLen = buf != null ? buf.length : 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopDirectShuffleMessage.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperUtils.java new file mode 100644 index 0000000..87adcb7 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperUtils.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +/** + * Set of mapper utility methods. + */ +public class HadoopMapperUtils { + /** Thread-local mapper index. */ + private static final ThreadLocal<Integer> MAP_IDX = new ThreadLocal<>(); + + /** + * @return Current mapper index. + */ + public static int mapperIndex() { + Integer res = MAP_IDX.get(); + + return res != null ? res : -1; + } + + /** + * @param idx Current mapper index. + */ + public static void mapperIndex(Integer idx) { + MAP_IDX.set(idx); + } + + /** + * Clear mapper index. + */ + public static void clearMapperIndex() { + MAP_IDX.remove(); + } + + /** + * Constructor. + */ + private HadoopMapperUtils() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java index 90a1bad..eec0636 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java @@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput; import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; @@ -153,6 +154,16 @@ public class HadoopV2Context extends JobContextImpl implements MapContext, Reduc } } + /** + * Callback invoked from mapper thread when map is finished. + * + * @throws IgniteCheckedException If failed. + */ + public void onMapperFinished() throws IgniteCheckedException { + if (output instanceof HadoopMapperAwareTaskOutput) + ((HadoopMapperAwareTaskOutput)output).onMapperFinished(); + } + /** {@inheritDoc} */ @Override public OutputCommitter getOutputCommitter() { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java index 418df4e..eb3b935 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java @@ -28,6 +28,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; /** @@ -49,6 +50,11 @@ public class HadoopV2MapTask extends HadoopV2Task { JobContextImpl jobCtx = taskCtx.jobContext(); + if (taskCtx.taskInfo().hasMapperIndex()) + HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex()); + else + HadoopMapperUtils.clearMapperIndex(); + try { InputSplit nativeSplit = hadoopContext().getInputSplit(); @@ -72,6 +78,8 @@ public class HadoopV2MapTask extends HadoopV2Task { try { mapper.run(new WrappedMapper().getMapContext(hadoopContext())); + + hadoopContext().onMapperFinished(); } finally { closeWriter(); @@ -92,6 +100,8 @@ public class HadoopV2MapTask extends HadoopV2Task { throw new IgniteCheckedException(e); } finally { + HadoopMapperUtils.clearMapperIndex(); + if (err != null) abort(outputFormat); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java index 36782bf..a725534 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -1018,6 +1018,8 @@ public class HadoopJobTracker extends HadoopComponent { if (state == null) state = initState(jobId); + int mapperIdx = 0; + for (HadoopInputSplit split : mappers) { if (state.addMapper(split)) { if (log.isDebugEnabled()) @@ -1026,6 +1028,8 @@ public class HadoopJobTracker extends HadoopComponent { HadoopTaskInfo taskInfo = new HadoopTaskInfo(MAP, jobId, meta.taskNumber(split), 0, split); + taskInfo.mapperIndex(mapperIdx++); + if (tasks == null) tasks = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java index 82bbd32..8ffea8c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.processors.hadoop.HadoopComponent; import org.apache.ignite.internal.processors.hadoop.HadoopContext; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; @@ -39,6 +40,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.plugin.extensions.communication.Message; +import java.util.Collection; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -102,8 +104,8 @@ public class HadoopShuffle extends HadoopComponent { private HadoopShuffleJob<UUID> newJob(HadoopJobId jobId) throws IgniteCheckedException { HadoopMapReducePlan plan = ctx.jobTracker().plan(jobId); - HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log, - ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()), true); + HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log, ctx.jobTracker().job(jobId, null), + mem, plan.reducers(), plan.reducers(ctx.localNodeId()), localMappersCount(plan), true); UUID[] rdcAddrs = new UUID[plan.reducers()]; @@ -123,6 +125,18 @@ public class HadoopShuffle extends HadoopComponent { } /** + * Get number of local mappers. + * + * @param plan Plan. + * @return Number of local mappers. + */ + private int localMappersCount(HadoopMapReducePlan plan) { + Collection<HadoopInputSplit> locMappers = plan.mappers(ctx.localNodeId()); + + return F.isEmpty(locMappers) ? 0 : locMappers.size(); + } + + /** * @param nodeId Node ID to send message to. * @param msg Message to send. * @throws IgniteCheckedException If send failed. @@ -195,6 +209,11 @@ public class HadoopShuffle extends HadoopComponent { job(m.jobId()).onShuffleMessage(src, m); } + else if (msg instanceof HadoopDirectShuffleMessage) { + HadoopDirectShuffleMessage m = (HadoopDirectShuffleMessage)msg; + + job(m.jobId()).onDirectShuffleMessage(src, m); + } else if (msg instanceof HadoopShuffleAck) { HadoopShuffleAck m = (HadoopShuffleAck)msg; http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java index 0a3a0ae..214a335 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java @@ -17,20 +17,16 @@ package org.apache.ignite.internal.processors.hadoop.shuffle; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.hadoop.HadoopJob; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput; +import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils; import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner; +import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; @@ -41,6 +37,9 @@ import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimap; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList; +import org.apache.ignite.internal.processors.hadoop.shuffle.direct.HadoopDirectDataInput; +import org.apache.ignite.internal.processors.hadoop.shuffle.direct.HadoopDirectDataOutputContext; +import org.apache.ignite.internal.processors.hadoop.shuffle.direct.HadoopDirectDataOutputState; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -55,9 +54,19 @@ import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.thread.IgniteThread; +import org.jetbrains.annotations.Nullable; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReferenceArray; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get; @@ -121,6 +130,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable { /** Message size. */ private final int msgSize; + /** Whether to strip mappers for remote execution. */ + private final boolean stripeMappers; + /** Local shuffle states. */ private volatile HashMap<T, HadoopShuffleLocalState> locShuffleStates = new HashMap<>(); @@ -143,11 +155,12 @@ public class HadoopShuffleJob<T> implements AutoCloseable { * @param mem Memory. * @param totalReducerCnt Amount of reducers in the Job. * @param locReducers Reducers will work on current node. + * @param locMappersCnt Number of mappers running on the given node. * @param embedded Whether shuffle is running in embedded mode. * @throws IgniteCheckedException If error. */ public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, - int totalReducerCnt, int[] locReducers, boolean embedded) throws IgniteCheckedException { + int totalReducerCnt, int[] locReducers, int locMappersCnt, boolean embedded) throws IgniteCheckedException { this.locReduceAddr = locReduceAddr; this.totalReducerCnt = totalReducerCnt; this.job = job; @@ -155,6 +168,27 @@ public class HadoopShuffleJob<T> implements AutoCloseable { this.log = log.getLogger(HadoopShuffleJob.class); this.embedded = embedded; + // No stripes for combiner. + boolean stripeMappers0 = get(job.info(), SHUFFLE_MAPPER_STRIPED_OUTPUT, false); + + if (stripeMappers0) { + if (job.info().hasCombiner()) { + log.info("Striped mapper output is disabled because it cannot be used together with combiner [jobId=" + + job.id() + ']'); + + stripeMappers0 = false; + } + + if (!embedded) { + log.info("Striped mapper output is disabled becuase it cannot be used in external mode [jobId=" + + job.id() + ']'); + + stripeMappers0 = false; + } + } + + stripeMappers = stripeMappers0; + msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE); locReducersCtx = new AtomicReferenceArray<>(totalReducerCnt); @@ -169,9 +203,20 @@ public class HadoopShuffleJob<T> implements AutoCloseable { needPartitioner = totalReducerCnt > 1; + // Size of local map is always equal to total reducer number to allow index-based lookup. locMaps = new AtomicReferenceArray<>(totalReducerCnt); - rmtMaps = new AtomicReferenceArray<>(totalReducerCnt); - msgs = new HadoopShuffleMessage[totalReducerCnt]; + + // Size of remote map: + // - If there are no local mappers, then we will not send anything, so set to 0; + // - If output is not striped, then match it to total reducer count, the same way as for local maps. + // - If output is striped, then multiply previous value by number of local mappers. + int rmtMapsSize = locMappersCnt == 0 ? 0 : totalReducerCnt; + + if (stripeMappers) + rmtMapsSize *= locMappersCnt; + + rmtMaps = new AtomicReferenceArray<>(rmtMapsSize); + msgs = new HadoopShuffleMessage[rmtMapsSize]; throttle = get(job.info(), SHUFFLE_JOB_THROTTLE, 0); } @@ -208,24 +253,26 @@ public class HadoopShuffleJob<T> implements AutoCloseable { this.io = io; - if (!flushed) { - snd = new GridWorker(gridName, "hadoop-shuffle-" + job.id(), log) { - @Override protected void body() throws InterruptedException { - try { - while (!isCancelled()) { - if (throttle > 0) - Thread.sleep(throttle); - - collectUpdatesAndSend(false); + if (!stripeMappers) { + if (!flushed) { + snd = new GridWorker(gridName, "hadoop-shuffle-" + job.id(), log) { + @Override protected void body() throws InterruptedException { + try { + while (!isCancelled()) { + if (throttle > 0) + Thread.sleep(throttle); + + collectUpdatesAndSend(false); + } + } + catch (IgniteCheckedException e) { + throw new IllegalStateException(e); } } - catch (IgniteCheckedException e) { - throw new IllegalStateException(e); - } - } - }; + }; - new IgniteThread(snd).start(); + new IgniteThread(snd).start(); + } } ioInitLatch.countDown(); @@ -306,6 +353,46 @@ public class HadoopShuffleJob<T> implements AutoCloseable { } /** + * Process shuffle message. + * + * @param src Source. + * @param msg Message. + * @throws IgniteCheckedException Exception. + */ + public void onDirectShuffleMessage(T src, HadoopDirectShuffleMessage msg) throws IgniteCheckedException { + assert msg.buffer() != null; + + HadoopTaskContext taskCtx = locReducersCtx.get(msg.reducer()).get(); + + HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null); + + perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis()); + + HadoopMultimap map = getOrCreateMap(locMaps, msg.reducer()); + + HadoopSerialization keySer = taskCtx.keySerialization(); + HadoopSerialization valSer = taskCtx.valueSerialization(); + + // Add data from message to the map. + try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) { + HadoopDirectDataInput in = new HadoopDirectDataInput(msg.buffer()); + + Object key = null; + Object val = null; + + for (int i = 0; i < msg.count(); i++) { + key = keySer.read(in, key); + val = valSer.read(in, val); + + adder.write(key, val); + } + } + + if (localShuffleState(src).onShuffleMessage()) + sendFinishResponse(src, msg.jobId()); + } + + /** * @param ack Shuffle ack. */ @SuppressWarnings("ConstantConditions") @@ -467,88 +554,149 @@ public class HadoopShuffleJob<T> implements AutoCloseable { } /** - * Sends map updates to remote reducers. + * Send updates to remote reducers. + * + * @param flush Flush flag. + * @throws IgniteCheckedException If failed. */ private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException { - for (int i = 0; i < rmtMaps.length(); i++) { - HadoopMultimap map = rmtMaps.get(i); + for (int i = 0; i < rmtMaps.length(); i++) + collectUpdatesAndSend(i, flush); + } + + /** + * Send updates to concrete remote reducer. + * + * @param rmtMapIdx Remote map index. + * @param flush Flush flag. + * @throws IgniteCheckedException If failed. + */ + private void collectUpdatesAndSend(int rmtMapIdx, boolean flush) throws IgniteCheckedException { + final int rmtRdcIdx = stripeMappers ? rmtMapIdx % totalReducerCnt : rmtMapIdx; - if (map == null) - continue; // Skip empty map and local node. + HadoopMultimap map = rmtMaps.get(rmtMapIdx); - if (msgs[i] == null) - msgs[i] = new HadoopShuffleMessage(job.id(), i, msgSize); + if (map == null) + return; - final int idx = i; + if (msgs[rmtMapIdx] == null) + msgs[rmtMapIdx] = new HadoopShuffleMessage(job.id(), rmtRdcIdx, msgSize); - map.visit(false, new HadoopMultimap.Visitor() { - /** */ - private long keyPtr; + visit(map, rmtMapIdx, rmtRdcIdx); - /** */ - private int keySize; + if (flush && msgs[rmtMapIdx].offset() != 0) + send(rmtMapIdx, rmtRdcIdx, 0); + } - /** */ - private boolean keyAdded; + /** + * Flush remote direct context. + * + * @param rmtMapIdx Remote map index. + * @param rmtDirectCtx Remote direct context. + * @param reset Whether to perform reset. + */ + private void sendShuffleMessage(int rmtMapIdx, @Nullable HadoopDirectDataOutputContext rmtDirectCtx, boolean reset) { + if (rmtDirectCtx == null) + return; - /** {@inheritDoc} */ - @Override public void onKey(long keyPtr, int keySize) { - this.keyPtr = keyPtr; - this.keySize = keySize; + int cnt = rmtDirectCtx.count(); - keyAdded = false; - } + if (cnt == 0) + return; - private boolean tryAdd(long valPtr, int valSize) { - HadoopShuffleMessage msg = msgs[idx]; + int rmtRdcIdx = stripeMappers ? rmtMapIdx % totalReducerCnt : rmtMapIdx; - if (!keyAdded) { // Add key and value. - int size = keySize + valSize; + HadoopDirectDataOutputState state = rmtDirectCtx.state(); - if (!msg.available(size, false)) - return false; + if (reset) + rmtDirectCtx.reset(); - msg.addKey(keyPtr, keySize); - msg.addValue(valPtr, valSize); + HadoopDirectShuffleMessage msg = new HadoopDirectShuffleMessage(job.id(), rmtRdcIdx, cnt, + state.buffer(), state.bufferLength()); - keyAdded = true; + T nodeId = reduceAddrs[rmtRdcIdx]; - return true; - } + io.apply(nodeId, msg); + + remoteShuffleState(nodeId).onShuffleMessage(); + } + + /** + * Visit output map. + * + * @param map Map. + * @param rmtMapIdx Remote map index. + * @param rmtRdcIdx Remote reducer index. + * @throws IgniteCheckedException If failed. + */ + private void visit(HadoopMultimap map, final int rmtMapIdx, final int rmtRdcIdx) throws IgniteCheckedException { + map.visit(false, new HadoopMultimap.Visitor() { + /** */ + private long keyPtr; - if (!msg.available(valSize, true)) + /** */ + private int keySize; + + /** */ + private boolean keyAdded; + + /** {@inheritDoc} */ + @Override public void onKey(long keyPtr, int keySize) { + this.keyPtr = keyPtr; + this.keySize = keySize; + + keyAdded = false; + } + + private boolean tryAdd(long valPtr, int valSize) { + HadoopShuffleMessage msg = msgs[rmtMapIdx]; + + if (!keyAdded) { // Add key and value. + int size = keySize + valSize; + + if (!msg.available(size, false)) return false; + msg.addKey(keyPtr, keySize); msg.addValue(valPtr, valSize); + keyAdded = true; + return true; } - /** {@inheritDoc} */ - @Override public void onValue(long valPtr, int valSize) { - if (tryAdd(valPtr, valSize)) - return; + if (!msg.available(valSize, true)) + return false; - send(idx, keySize + valSize); + msg.addValue(valPtr, valSize); - keyAdded = false; + return true; + } - if (!tryAdd(valPtr, valSize)) - throw new IllegalStateException(); - } - }); + /** {@inheritDoc} */ + @Override public void onValue(long valPtr, int valSize) { + if (tryAdd(valPtr, valSize)) + return; - if (flush && msgs[i].offset() != 0) - send(i, 0); - } + send(rmtMapIdx, rmtRdcIdx, keySize + valSize); + + keyAdded = false; + + if (!tryAdd(valPtr, valSize)) + throw new IllegalStateException(); + } + }); } /** - * @param idx Index of message. + * Send message. + * + * @param rmtMapIdx Remote map index. + * @param rmtRdcIdx Remote reducer index. * @param newBufMinSize Min new buffer size. */ - private void send(final int idx, int newBufMinSize) { - HadoopShuffleMessage msg = msgs[idx]; + private void send(int rmtMapIdx, int rmtRdcIdx, int newBufMinSize) { + HadoopShuffleMessage msg = msgs[rmtMapIdx]; final long msgId = msg.id(); @@ -566,10 +714,10 @@ public class HadoopShuffleJob<T> implements AutoCloseable { } try { - io.apply(reduceAddrs[idx], msg); + io.apply(reduceAddrs[rmtRdcIdx], msg); if (embedded) - remoteShuffleState(reduceAddrs[idx]).onShuffleMessage(); + remoteShuffleState(reduceAddrs[rmtRdcIdx]).onShuffleMessage(); } catch (GridClosureException e) { if (fut != null) @@ -593,7 +741,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { }); } - msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx, + msgs[rmtMapIdx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), rmtRdcIdx, Math.max(msgSize, newBufMinSize)); } @@ -639,31 +787,33 @@ public class HadoopShuffleJob<T> implements AutoCloseable { if (totalReducerCnt == 0) return new GridFinishedFuture<>(); - U.await(ioInitLatch); + if (!stripeMappers) { + U.await(ioInitLatch); - GridWorker snd0 = snd; + GridWorker snd0 = snd; - if (snd0 != null) { - if (log.isDebugEnabled()) - log.debug("Cancelling sender thread."); + if (snd0 != null) { + if (log.isDebugEnabled()) + log.debug("Cancelling sender thread."); - snd0.cancel(); + snd0.cancel(); - try { - snd0.join(); + try { + snd0.join(); - if (log.isDebugEnabled()) - log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + job.id()); - } - catch (InterruptedException e) { - throw new IgniteInterruptedCheckedException(e); + if (log.isDebugEnabled()) + log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + job.id()); + } + catch (InterruptedException e) { + throw new IgniteInterruptedCheckedException(e); + } } - } - collectUpdatesAndSend(true); // With flush. + collectUpdatesAndSend(true); // With flush. - if (log.isDebugEnabled()) - log.debug("Finished sending collected updates to remote reducers: " + job.id()); + if (log.isDebugEnabled()) + log.debug("Finished sending collected updates to remote reducers: " + job.id()); + } GridCompoundFuture fut = new GridCompoundFuture<>(); @@ -700,8 +850,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable { if (log.isDebugEnabled()) log.debug("Collected futures to compound futures for flush: " + sentMsgs.size()); - } + return fut; } @@ -775,13 +925,17 @@ public class HadoopShuffleJob<T> implements AutoCloseable { /** * Partitioned output. */ - private class PartitionedOutput implements HadoopTaskOutput { + public class PartitionedOutput implements HadoopMapperAwareTaskOutput { /** */ private final HadoopTaskOutput[] locAdders = new HadoopTaskOutput[locMaps.length()]; /** */ private final HadoopTaskOutput[] rmtAdders = new HadoopTaskOutput[rmtMaps.length()]; + /** Remote direct contexts. */ + private final HadoopDirectDataOutputContext[] rmtDirectCtxs = + new HadoopDirectDataOutputContext[rmtMaps.length()]; + /** */ private HadoopPartitioner partitioner; @@ -819,16 +973,53 @@ public class HadoopShuffleJob<T> implements AutoCloseable { locAdders[part] = out = getOrCreateMap(locMaps, part).startAdding(taskCtx); } else { - out = rmtAdders[part]; + if (stripeMappers) { + int mapperIdx = HadoopMapperUtils.mapperIndex(); - if (out == null) - rmtAdders[part] = out = getOrCreateMap(rmtMaps, part).startAdding(taskCtx); + assert mapperIdx >= 0; + + int idx = totalReducerCnt * mapperIdx + part; + + HadoopDirectDataOutputContext rmtDirectCtx = rmtDirectCtxs[idx]; + + if (rmtDirectCtx == null) { + rmtDirectCtx = new HadoopDirectDataOutputContext(msgSize, taskCtx); + + rmtDirectCtxs[idx] = rmtDirectCtx; + } + + if (rmtDirectCtx.write(key, val)) + sendShuffleMessage(idx, rmtDirectCtx, true); + + return; + } + else { + out = rmtAdders[part]; + + if (out == null) + rmtAdders[part] = out = getOrCreateMap(rmtMaps, part).startAdding(taskCtx); + } } out.write(key, val); } /** {@inheritDoc} */ + @Override public void onMapperFinished() throws IgniteCheckedException { + if (stripeMappers) { + int mapperIdx = HadoopMapperUtils.mapperIndex(); + + assert mapperIdx >= 0; + + for (int i = 0; i < totalReducerCnt; i++) { + int idx = totalReducerCnt * mapperIdx + i; + + sendShuffleMessage(idx, rmtDirectCtxs[idx], false); + } + } + } + + /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException { for (HadoopTaskOutput adder : locAdders) { if (adder != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java index 5ffaa55..4331124 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java @@ -17,17 +17,14 @@ package org.apache.ignite.internal.processors.hadoop.shuffle; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.lang.IgniteInClosure; -import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; /** * Remote shuffle state. */ -class HadoopShuffleRemoteState<T> { +class HadoopShuffleRemoteState { /** Message count. */ private final AtomicLong msgCnt = new AtomicLong(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java new file mode 100644 index 0000000..e3a713a --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.direct; + +import org.apache.ignite.internal.util.GridUnsafe; +import org.jetbrains.annotations.NotNull; + +import java.io.DataInput; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF; + +/** + * Hadoop data input used for direct communication. + */ +public class HadoopDirectDataInput extends InputStream implements DataInput { + /** Data buffer. */ + private final byte[] buf; + + /** Position. */ + private int pos; + + /** + * Constructor. + * + * @param buf Buffer. + */ + public HadoopDirectDataInput(byte[] buf) { + this.buf = buf; + } + + /** {@inheritDoc} */ + @Override public int read() throws IOException { + return readByte(); + } + + /** {@inheritDoc} */ + @Override public void readFully(@NotNull byte[] b) throws IOException { + readFully(b, 0, b.length); + } + + /** {@inheritDoc} */ + @Override public void readFully(@NotNull byte[] b, int off, int len) throws IOException { + System.arraycopy(buf, pos, b, off, len); + + pos += len; + } + + /** {@inheritDoc} */ + @Override public int skipBytes(int n) throws IOException { + pos += n; + + return n; + } + + /** {@inheritDoc} */ + @Override public boolean readBoolean() throws IOException { + return readByte() == 1; + } + + /** {@inheritDoc} */ + @Override public byte readByte() throws IOException { + byte res = GridUnsafe.getByte(buf, BYTE_ARR_OFF + pos); + + pos += 1; + + return res; + } + + /** {@inheritDoc} */ + @Override public int readUnsignedByte() throws IOException { + return readByte() & 0xff; + } + + /** {@inheritDoc} */ + @Override public short readShort() throws IOException { + short res = GridUnsafe.getShort(buf, BYTE_ARR_OFF + pos); + + pos += 2; + + return res; + } + + /** {@inheritDoc} */ + @Override public int readUnsignedShort() throws IOException { + return readShort() & 0xffff; + } + + /** {@inheritDoc} */ + @Override public char readChar() throws IOException { + char res = GridUnsafe.getChar(buf, BYTE_ARR_OFF + pos); + + pos += 2; + + return res; + } + + /** {@inheritDoc} */ + @Override public int readInt() throws IOException { + int res = GridUnsafe.getInt(buf, BYTE_ARR_OFF + pos); + + pos += 4; + + return res; + } + + /** {@inheritDoc} */ + @Override public long readLong() throws IOException { + long res = GridUnsafe.getLong(buf, BYTE_ARR_OFF + pos); + + pos += 8; + + return res; + } + + /** {@inheritDoc} */ + @Override public float readFloat() throws IOException { + float res = GridUnsafe.getFloat(buf, BYTE_ARR_OFF + pos); + + pos += 4; + + return res; + } + + /** {@inheritDoc} */ + @Override public double readDouble() throws IOException { + double res = GridUnsafe.getDouble(buf, BYTE_ARR_OFF + pos); + + pos += 8; + + return res; + } + + /** {@inheritDoc} */ + @Override public String readLine() throws IOException { + // TODO: Create ticket! + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @NotNull @Override public String readUTF() throws IOException { + byte[] bytes = new byte[readShort()]; + + if (bytes.length != 0) + readFully(bytes); + + return new String(bytes, StandardCharsets.UTF_8); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java new file mode 100644 index 0000000..151e552 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.direct; + +import org.apache.ignite.internal.util.GridUnsafe; +import org.jetbrains.annotations.NotNull; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UTFDataFormatException; +import java.nio.charset.StandardCharsets; + +import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF; + +/** + * Hadoop data output for direct communication. + */ +public class HadoopDirectDataOutput extends OutputStream implements DataOutput { + /** Flush size. */ + private final int flushSize; + + /** Data buffer. */ + private byte[] buf; + + /** Buffer size. */ + private int bufSize; + + /** Position. */ + private int pos; + + /** + * Constructor. + * + * @param flushSize Flush size. + */ + public HadoopDirectDataOutput(int flushSize) { + this(flushSize, flushSize); + } + + /** + * Constructor. + * + * @param flushSize Flush size. + * @param allocSize Allocation size. + */ + public HadoopDirectDataOutput(int flushSize, int allocSize) { + this.flushSize = flushSize; + + buf = new byte[allocSize]; + bufSize = allocSize; + } + + /** {@inheritDoc} */ + @Override public void write(@NotNull byte[] b) throws IOException { + write(b, 0, b.length); + } + + /** {@inheritDoc} */ + @Override public void write(@NotNull byte[] b, int off, int len) throws IOException { + int writePos = ensure(len); + + System.arraycopy(b, off, buf, writePos, len); + } + + /** {@inheritDoc} */ + @Override public void write(int val) throws IOException { + writeByte(val); + } + + /** {@inheritDoc} */ + @Override public void writeBoolean(boolean val) throws IOException { + writeByte(val ? (byte)1 : (byte)0); + } + + /** {@inheritDoc} */ + @Override public void writeByte(int val) throws IOException { + int writePos = ensure(1); + + buf[writePos] = (byte)val; + } + + /** {@inheritDoc} */ + @Override public void writeShort(int val) throws IOException { + int writePos = ensure(2); + + GridUnsafe.putShort(buf, BYTE_ARR_OFF + writePos, (short)val); + } + + /** {@inheritDoc} */ + @Override public void writeChar(int val) throws IOException { + int writePos = ensure(2); + + GridUnsafe.putChar(buf, BYTE_ARR_OFF + writePos, (char)val); + } + + /** {@inheritDoc} */ + @Override public void writeInt(int val) throws IOException { + int writePos = ensure(4); + + GridUnsafe.putInt(buf, BYTE_ARR_OFF + writePos, val); + } + + /** {@inheritDoc} */ + @Override public void writeLong(long val) throws IOException { + int writePos = ensure(8); + + GridUnsafe.putLong(buf, BYTE_ARR_OFF + writePos, val); + } + + /** {@inheritDoc} */ + @Override public void writeFloat(float val) throws IOException { + int writePos = ensure(4); + + GridUnsafe.putFloat(buf, BYTE_ARR_OFF + writePos, val); + } + + /** {@inheritDoc} */ + @Override public void writeDouble(double val) throws IOException { + int writePos = ensure(8); + + GridUnsafe.putDouble(buf, BYTE_ARR_OFF + writePos, val); + } + + /** {@inheritDoc} */ + @Override public void writeBytes(@NotNull String str) throws IOException { + for(int i = 0; i < str.length(); ++i) + write((byte)str.charAt(i)); + } + + /** {@inheritDoc} */ + @Override public void writeChars(@NotNull String str) throws IOException { + for (int i = 0; i < str.length(); ++i) + writeChar(str.charAt(i)); + } + + /** {@inheritDoc} */ + @Override public void writeUTF(@NotNull String str) throws IOException { + byte[] bytes = str.getBytes(StandardCharsets.UTF_8); + + int len = bytes.length; + + if (len > 65535) + throw new UTFDataFormatException("UTF8 form of string is longer than 65535 bytes: " + str); + + writeShort((short)len); + write(bytes); + } + + /** + * @return Buffer. + */ + public byte[] buffer() { + return buf; + } + + /** + * @return Position. + */ + public int position() { + return pos; + } + + /** + * @return Whether buffer is ready for flush. + */ + public boolean readyForFlush() { + return pos >= flushSize; + } + + /** + * Ensure that the given amount of bytes is available within the stream, then shift the position. + * + * @param cnt Count. + * @return Position + */ + private int ensure(int cnt) { + int pos0 = pos; + + if (pos0 + cnt > bufSize) + grow(pos0 + cnt); + + pos += cnt; + + return pos0; + } + + /** + * Grow array up to the given count. + * + * @param cnt Count. + */ + private void grow(int cnt) { + int bufSize0 = (int)(bufSize * 1.1); + + if (bufSize0 < cnt) + bufSize0 = cnt; + + byte[] buf0 = new byte[bufSize0]; + + System.arraycopy(buf, 0, buf0, 0, pos); + + buf = buf0; + bufSize = bufSize0; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java new file mode 100644 index 0000000..bc70ef3 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.direct; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; + +/** + * Hadoop data output context for direct communication. + */ +public class HadoopDirectDataOutputContext { + /** Flush size. */ + private final int flushSize; + + /** Key serialization. */ + private final HadoopSerialization keySer; + + /** Value serialization. */ + private final HadoopSerialization valSer; + + /** Data output. */ + private HadoopDirectDataOutput out; + + /** Number of keys written. */ + private int cnt; + + /** + * Constructor. + * + * @param flushSize Flush size. + * @param taskCtx Task context. + * @throws IgniteCheckedException If failed. + */ + public HadoopDirectDataOutputContext(int flushSize, HadoopTaskContext taskCtx) + throws IgniteCheckedException { + this.flushSize = flushSize; + + keySer = taskCtx.keySerialization(); + valSer = taskCtx.valueSerialization(); + + out = new HadoopDirectDataOutput(flushSize); + } + + /** + * Write key-value pair. + * + * @param key Key. + * @param val Value. + * @return Whether flush is needed. + * @throws IgniteCheckedException If failed. + */ + public boolean write(Object key, Object val) throws IgniteCheckedException { + keySer.write(out, key); + valSer.write(out, val); + + cnt++; + + return out.readyForFlush(); + } + + /** + * @return Key-value pairs count. + */ + public int count() { + return cnt; + } + + /** + * @return State. + */ + public HadoopDirectDataOutputState state() { + return new HadoopDirectDataOutputState(out.buffer(), out.position()); + } + + /** + * Reset buffer. + */ + public void reset() { + int allocSize = Math.max(flushSize, out.position()); + + out = new HadoopDirectDataOutput(flushSize, allocSize); + cnt = 0; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java new file mode 100644 index 0000000..a9c12e3 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.direct; + +/** + * Hadoop data output state for direct communication. + */ +public class HadoopDirectDataOutputState { + /** Buffer. */ + private final byte[] buf; + + /** Buffer length. */ + private final int bufLen; + + /** + * Constructor. + * + * @param buf Buffer. + * @param bufLen Buffer length. + */ + public HadoopDirectDataOutputState(byte[] buf, int bufLen) { + this.buf = buf; + this.bufLen = bufLen; + } + + /** + * @return Buffer. + */ + public byte[] buffer() { + return buf; + } + + /** + * @return Length. + */ + public int bufferLength() { + return bufLen; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java index cb08c00..3336120 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java @@ -151,7 +151,7 @@ public class HadoopChildProcessRunner { job.initialize(true, nodeDesc.processId()); shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem, - req.totalReducerCount(), req.localReducers(), false); + req.totalReducerCount(), req.localReducers(), 0, false); initializeExecutors(req); http://git-wip-us.apache.org/repos/asf/ignite/blob/a976c425/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java index b04deeb..8897a38 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java @@ -34,6 +34,7 @@ import org.apache.ignite.configuration.HadoopConfiguration; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty; import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount1; import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2; @@ -54,12 +55,28 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest { return cfg; } + /* + * @throws Exception If fails. + */ + public void testMultiReducerWholeMapReduceExecution() throws Exception { + checkMultiReducerWholeMapReduceExecution(false); + } + + /* + * @throws Exception If fails. + */ + public void testMultiReducerWholeMapReduceExecutionStriped() throws Exception { + checkMultiReducerWholeMapReduceExecution(true); + } + /** * Tests whole job execution with all phases in old and new versions of API with definition of custom * Serialization, Partitioner and IO formats. + * + * @param striped Whether output should be striped or not. * @throws Exception If fails. */ - public void testMultiReducerWholeMapReduceExecution() throws Exception { + public void checkMultiReducerWholeMapReduceExecution(boolean striped) throws Exception { IgfsPath inDir = new IgfsPath(PATH_INPUT); igfs.mkdirs(inDir); @@ -81,6 +98,9 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest { JobConf jobConf = new JobConf(); + if (striped) + jobConf.set(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(), "true"); + jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); //To split into about 6-7 items for v2
