IGNITE-4271: Hadoop: shuffle messages now use "direct-marshallable" path this avoiding unnecessary copying as opposed to user messages which were used previously. This closes #1266. This closes #1313.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7b97cfd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7b97cfd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7b97cfd Branch: refs/heads/master Commit: b7b97cfd3419da4421423e189369a4931efd3996 Parents: 3c38421 Author: devozerov <[email protected]> Authored: Mon Dec 5 15:49:01 2016 +0300 Committer: devozerov <[email protected]> Committed: Thu Dec 15 13:45:37 2016 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/GridTopic.java | 3 + .../communication/GridIoMessageFactory.java | 18 + .../internal/processors/hadoop/HadoopJobId.java | 79 +++- .../hadoop/message/HadoopMessage.java | 27 ++ .../hadoop/shuffle/HadoopShuffleAck.java | 170 +++++++++ .../hadoop/shuffle/HadoopShuffleMessage.java | 361 +++++++++++++++++++ .../hadoop/message/HadoopMessage.java | 27 -- .../hadoop/shuffle/HadoopShuffle.java | 25 +- .../hadoop/shuffle/HadoopShuffleAck.java | 92 ----- .../hadoop/shuffle/HadoopShuffleJob.java | 5 +- .../hadoop/shuffle/HadoopShuffleMessage.java | 242 ------------- .../child/HadoopChildProcessRunner.java | 6 +- 12 files changed, 682 insertions(+), 373 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index 248f75b..b5608db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -91,6 +91,9 @@ public enum GridTopic { TOPIC_HADOOP, /** */ + TOPIC_HADOOP_MSG, + + /** */ TOPIC_QUERY, /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index f36191c..dd68984 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -117,6 +117,9 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousMessage; import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck; +import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage; import org.apache.ignite.internal.processors.igfs.IgfsAckMessage; import org.apache.ignite.internal.processors.igfs.IgfsBlockKey; import org.apache.ignite.internal.processors.igfs.IgfsBlocksMessage; @@ -165,6 +168,21 @@ public class GridIoMessageFactory implements MessageFactory { Message msg = null; switch (type) { + case -30: + msg = new HadoopJobId(); + + break; + + case -29: + msg = new HadoopShuffleAck(); + + break; + + case -28: + msg = new HadoopShuffleMessage(); + + break; + case -27: msg = new GridDhtTxOnePhaseCommitAckRequest(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java index 8c61fab..740ab89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java @@ -21,14 +21,18 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.nio.ByteBuffer; import java.util.UUID; import org.apache.ignite.internal.processors.cache.GridCacheInternal; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * Job ID. */ -public class HadoopJobId implements GridCacheInternal, Externalizable { +public class HadoopJobId implements Message, GridCacheInternal, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -63,6 +67,79 @@ public class HadoopJobId implements GridCacheInternal, Externalizable { } /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeUuid("nodeId", nodeId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeInt("localId", jobId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + nodeId = reader.readUuid("nodeId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + jobId = reader.readInt("jobId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(HadoopJobId.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -30; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeUuid(out, nodeId); out.writeInt(jobId); http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java new file mode 100644 index 0000000..0d7bd3a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.message; + +import java.io.Externalizable; + +/** + * Marker interface for all hadoop messages. + */ +public interface HadoopMessage extends Externalizable { + // No-op. +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java new file mode 100644 index 0000000..6dd2c2d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle; + +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.apache.ignite.plugin.extensions.communication.Message; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.nio.ByteBuffer; + +/** + * Acknowledgement message. + */ +public class HadoopShuffleAck implements HadoopMessage, Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @GridToStringInclude + private long msgId; + + /** */ + @GridToStringInclude + private HadoopJobId jobId; + + /** + * + */ + public HadoopShuffleAck() { + // No-op. + } + + /** + * @param msgId Message ID. + */ + public HadoopShuffleAck(long msgId, HadoopJobId jobId) { + assert jobId != null; + + this.msgId = msgId; + this.jobId = jobId; + } + + /** + * @return Message ID. + */ + public long id() { + return msgId; + } + + /** + * @return Job ID. + */ + public HadoopJobId jobId() { + return jobId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("msgId", msgId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeMessage("jobId", jobId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + msgId = reader.readLong("msgId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + jobId = reader.readMessage("jobId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(HadoopShuffleAck.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -29; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + jobId.writeExternal(out); + out.writeLong(msgId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = new HadoopJobId(); + + jobId.readExternal(in); + msgId = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopShuffleAck.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java new file mode 100644 index 0000000..3732bc2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Shuffle message. + */ +public class HadoopShuffleMessage implements Message, HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final AtomicLong ids = new AtomicLong(); + + /** */ + private static final byte MARKER_KEY = (byte)17; + + /** */ + private static final byte MARKER_VALUE = (byte)31; + + /** */ + @GridToStringInclude + private long msgId; + + /** */ + @GridToStringInclude + private HadoopJobId jobId; + + /** */ + @GridToStringInclude + private int reducer; + + /** */ + private byte[] buf; + + /** */ + @GridToStringInclude + private int off; + + /** + * + */ + public HadoopShuffleMessage() { + // No-op. + } + + /** + * @param size Size. + */ + public HadoopShuffleMessage(HadoopJobId jobId, int reducer, int size) { + assert jobId != null; + + buf = new byte[size]; + + this.jobId = jobId; + this.reducer = reducer; + + msgId = ids.incrementAndGet(); + } + + /** + * @return Message ID. + */ + public long id() { + return msgId; + } + + /** + * @return Job ID. + */ + public HadoopJobId jobId() { + return jobId; + } + + /** + * @return Reducer. + */ + public int reducer() { + return reducer; + } + + /** + * @return Buffer. + */ + public byte[] buffer() { + return buf; + } + + /** + * @return Offset. + */ + public int offset() { + return off; + } + + /** + * @param size Size. + * @param valOnly Only value wll be added. + * @return {@code true} If this message can fit additional data of this size + */ + public boolean available(int size, boolean valOnly) { + size += valOnly ? 5 : 10; + + if (off + size > buf.length) { + if (off == 0) { // Resize if requested size is too big. + buf = new byte[size]; + + return true; + } + + return false; + } + + return true; + } + + /** + * @param keyPtr Key pointer. + * @param keySize Key size. + */ + public void addKey(long keyPtr, int keySize) { + add(MARKER_KEY, keyPtr, keySize); + } + + /** + * @param valPtr Value pointer. + * @param valSize Value size. + */ + public void addValue(long valPtr, int valSize) { + add(MARKER_VALUE, valPtr, valSize); + } + + /** + * @param marker Marker. + * @param ptr Pointer. + * @param size Size. + */ + private void add(byte marker, long ptr, int size) { + buf[off++] = marker; + + GridUnsafe.putInt(buf, GridUnsafe.BYTE_ARR_OFF + off, size); + + off += 4; + + GridUnsafe.copyOffheapHeap(ptr, buf, GridUnsafe.BYTE_ARR_OFF + off, size); + + off += size; + } + + /** + * @param v Visitor. + */ + public void visit(Visitor v) throws IgniteCheckedException { + for (int i = 0; i < off;) { + byte marker = buf[i++]; + + int size = GridUnsafe.getInt(buf, GridUnsafe.BYTE_ARR_OFF + i); + + i += 4; + + if (marker == MARKER_VALUE) + v.onValue(buf, i, size); + else if (marker == MARKER_KEY) + v.onKey(buf, i, size); + else + throw new IllegalStateException(); + + i += size; + } + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("msgId", msgId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeMessage("jobId", jobId)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeInt("reducer", reducer)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeByteArray("buf", this.buf)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeInt("off", off)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + msgId = reader.readLong("msgId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + jobId = reader.readMessage("jobId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + reducer = reader.readInt("reducer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + this.buf = reader.readByteArray("buf"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + off = reader.readInt("off"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(HadoopShuffleMessage.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -28; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 5; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + jobId.writeExternal(out); + out.writeLong(msgId); + out.writeInt(reducer); + out.writeInt(off); + U.writeByteArray(out, buf); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = new HadoopJobId(); + + jobId.readExternal(in); + msgId = in.readLong(); + reducer = in.readInt(); + off = in.readInt(); + buf = U.readByteArray(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopShuffleMessage.class, this); + } + + /** + * Visitor. + */ + public static interface Visitor { + /** + * @param buf Buffer. + * @param off Offset. + * @param len Length. + */ + public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException; + + /** + * @param buf Buffer. + * @param off Offset. + * @param len Length. + */ + public void onValue(byte[] buf, int off, int len) throws IgniteCheckedException; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java deleted file mode 100644 index 0d7bd3a..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.message; - -import java.io.Externalizable; - -/** - * Marker interface for all hadoop messages. - */ -public interface HadoopMessage extends Externalizable { - // No-op. -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java index 769bdc4..a69e779 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java @@ -17,13 +17,12 @@ package org.apache.ignite.internal.processors.hadoop.shuffle; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.processors.hadoop.HadoopComponent; import org.apache.ignite.internal.processors.hadoop.HadoopContext; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; @@ -38,6 +37,11 @@ import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.plugin.extensions.communication.Message; + +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * Shuffle. @@ -53,6 +57,12 @@ public class HadoopShuffle extends HadoopComponent { @Override public void start(HadoopContext ctx) throws IgniteCheckedException { super.start(ctx); + ctx.kernalContext().io().addMessageListener(GridTopic.TOPIC_HADOOP_MSG, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + onMessageReceived(nodeId, (HadoopMessage)msg); + } + }); + ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP, new IgniteBiPredicate<UUID, Object>() { @Override public boolean apply(UUID nodeId, Object msg) { @@ -117,7 +127,10 @@ public class HadoopShuffle extends HadoopComponent { private void send0(UUID nodeId, Object msg) throws IgniteCheckedException { ClusterNode node = ctx.kernalContext().discovery().node(nodeId); - ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0); + if (msg instanceof Message) + ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL); + else + ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0); } /** @@ -151,8 +164,8 @@ public class HadoopShuffle extends HadoopComponent { */ private void startSending(HadoopShuffleJob<UUID> shuffleJob) { shuffleJob.startSending(ctx.kernalContext().gridName(), - new IgniteInClosure2X<UUID, HadoopShuffleMessage>() { - @Override public void applyx(UUID dest, HadoopShuffleMessage msg) throws IgniteCheckedException { + new IgniteInClosure2X<UUID, HadoopMessage>() { + @Override public void applyx(UUID dest, HadoopMessage msg) throws IgniteCheckedException { send0(dest, msg); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java deleted file mode 100644 index 6013ec6..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * Acknowledgement message. - */ -public class HadoopShuffleAck implements HadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @GridToStringInclude - private long msgId; - - /** */ - @GridToStringInclude - private HadoopJobId jobId; - - /** - * - */ - public HadoopShuffleAck() { - // No-op. - } - - /** - * @param msgId Message ID. - */ - public HadoopShuffleAck(long msgId, HadoopJobId jobId) { - assert jobId != null; - - this.msgId = msgId; - this.jobId = jobId; - } - - /** - * @return Message ID. - */ - public long id() { - return msgId; - } - - /** - * @return Job ID. - */ - public HadoopJobId jobId() { - return jobId; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - jobId.writeExternal(out); - out.writeLong(msgId); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new HadoopJobId(); - - jobId.readExternal(in); - msgId = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopShuffleAck.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java index 025c4da..9392b2c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput; import org.apache.ignite.internal.processors.hadoop.HadoopTaskType; import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; +import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimap; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList; @@ -92,7 +93,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { private final AtomicReferenceArray<HadoopMultimap> maps; /** */ - private volatile IgniteInClosure2X<T, HadoopShuffleMessage> io; + private volatile IgniteInClosure2X<T, HadoopMessage> io; /** */ protected ConcurrentMap<Long, IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>> sentMsgs = @@ -176,7 +177,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { * @param io IO Closure for sending messages. */ @SuppressWarnings("BusyWait") - public void startSending(String gridName, IgniteInClosure2X<T, HadoopShuffleMessage> io) { + public void startSending(String gridName, IgniteInClosure2X<T, HadoopMessage> io) { assert snd == null; assert io != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java deleted file mode 100644 index 71a314b..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.util.GridUnsafe; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Shuffle message. - */ -public class HadoopShuffleMessage implements HadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private static final AtomicLong ids = new AtomicLong(); - - /** */ - private static final byte MARKER_KEY = (byte)17; - - /** */ - private static final byte MARKER_VALUE = (byte)31; - - /** */ - @GridToStringInclude - private long msgId; - - /** */ - @GridToStringInclude - private HadoopJobId jobId; - - /** */ - @GridToStringInclude - private int reducer; - - /** */ - private byte[] buf; - - /** */ - @GridToStringInclude - private int off; - - /** - * - */ - public HadoopShuffleMessage() { - // No-op. - } - - /** - * @param size Size. - */ - public HadoopShuffleMessage(HadoopJobId jobId, int reducer, int size) { - assert jobId != null; - - buf = new byte[size]; - - this.jobId = jobId; - this.reducer = reducer; - - msgId = ids.incrementAndGet(); - } - - /** - * @return Message ID. - */ - public long id() { - return msgId; - } - - /** - * @return Job ID. - */ - public HadoopJobId jobId() { - return jobId; - } - - /** - * @return Reducer. - */ - public int reducer() { - return reducer; - } - - /** - * @return Buffer. - */ - public byte[] buffer() { - return buf; - } - - /** - * @return Offset. - */ - public int offset() { - return off; - } - - /** - * @param size Size. - * @param valOnly Only value wll be added. - * @return {@code true} If this message can fit additional data of this size - */ - public boolean available(int size, boolean valOnly) { - size += valOnly ? 5 : 10; - - if (off + size > buf.length) { - if (off == 0) { // Resize if requested size is too big. - buf = new byte[size]; - - return true; - } - - return false; - } - - return true; - } - - /** - * @param keyPtr Key pointer. - * @param keySize Key size. - */ - public void addKey(long keyPtr, int keySize) { - add(MARKER_KEY, keyPtr, keySize); - } - - /** - * @param valPtr Value pointer. - * @param valSize Value size. - */ - public void addValue(long valPtr, int valSize) { - add(MARKER_VALUE, valPtr, valSize); - } - - /** - * @param marker Marker. - * @param ptr Pointer. - * @param size Size. - */ - private void add(byte marker, long ptr, int size) { - buf[off++] = marker; - - GridUnsafe.putInt(buf, GridUnsafe.BYTE_ARR_OFF + off, size); - - off += 4; - - GridUnsafe.copyOffheapHeap(ptr, buf, GridUnsafe.BYTE_ARR_OFF + off, size); - - off += size; - } - - /** - * @param v Visitor. - */ - public void visit(Visitor v) throws IgniteCheckedException { - for (int i = 0; i < off;) { - byte marker = buf[i++]; - - int size = GridUnsafe.getInt(buf, GridUnsafe.BYTE_ARR_OFF + i); - - i += 4; - - if (marker == MARKER_VALUE) - v.onValue(buf, i, size); - else if (marker == MARKER_KEY) - v.onKey(buf, i, size); - else - throw new IllegalStateException(); - - i += size; - } - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - jobId.writeExternal(out); - out.writeLong(msgId); - out.writeInt(reducer); - out.writeInt(off); - U.writeByteArray(out, buf); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new HadoopJobId(); - - jobId.readExternal(in); - msgId = in.readLong(); - reducer = in.readInt(); - off = in.readInt(); - buf = U.readByteArray(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopShuffleMessage.class, this); - } - - /** - * Visitor. - */ - public static interface Visitor { - /** - * @param buf Buffer. - * @param off Offset. - * @param len Length. - */ - public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException; - - /** - * @param buf Buffer. - * @param off Offset. - * @param len Length. - */ - public void onValue(byte[] buf, int off, int len) throws IgniteCheckedException; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java index 45d9a27..7001b8c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java @@ -259,9 +259,9 @@ public class HadoopChildProcessRunner { if (req.reducersAddresses() != null) { if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) { shuffleJob.startSending("external", - new IgniteInClosure2X<HadoopProcessDescriptor, HadoopShuffleMessage>() { - @Override public void applyx(HadoopProcessDescriptor dest, - HadoopShuffleMessage msg) throws IgniteCheckedException { + new IgniteInClosure2X<HadoopProcessDescriptor, HadoopMessage>() { + @Override public void applyx(HadoopProcessDescriptor dest, HadoopMessage msg) + throws IgniteCheckedException { comm.sendMessage(dest, msg); } });
