IGNITE-4301: Hadoop: Optimized shuffle so that only one ack is needed when running in embedded mode. This closes #1319.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/58c33805 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/58c33805 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/58c33805 Branch: refs/heads/master Commit: 58c338051aed89a787c2a575856e949cc44e8b8c Parents: e857f29 Author: devozerov <[email protected]> Authored: Tue Dec 6 13:23:05 2016 +0300 Committer: devozerov <[email protected]> Committed: Thu Dec 15 13:46:14 2016 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 12 + .../shuffle/HadoopShuffleFinishRequest.java | 172 ++++++++++++++ .../shuffle/HadoopShuffleFinishResponse.java | 142 ++++++++++++ .../hadoop/shuffle/HadoopShuffle.java | 45 ++-- .../hadoop/shuffle/HadoopShuffleJob.java | 231 +++++++++++++++++-- .../hadoop/shuffle/HadoopShuffleLocalState.java | 67 ++++++ .../shuffle/HadoopShuffleRemoteState.java | 64 +++++ .../child/HadoopChildProcessRunner.java | 6 +- 8 files changed, 686 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/58c33805/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 86742e8..4ffb220 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -119,6 +119,8 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck; +import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishRequest; +import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishResponse; import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage; import org.apache.ignite.internal.processors.igfs.IgfsAckMessage; import org.apache.ignite.internal.processors.igfs.IgfsBlockKey; @@ -168,6 +170,16 @@ public class GridIoMessageFactory implements MessageFactory { Message msg = null; switch (type) { + case -41: + msg = new HadoopShuffleFinishResponse(); + + break; + + case -40: + msg = new HadoopShuffleFinishRequest(); + + break; + case -39: msg = new HadoopJobId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/58c33805/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishRequest.java new file mode 100644 index 0000000..f568c2e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishRequest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle; + +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.nio.ByteBuffer; + +/** + * Shuffle finish request. + */ +public class HadoopShuffleFinishRequest implements Message, HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Job ID. */ + @GridToStringInclude + private HadoopJobId jobId; + + /** Total message count. */ + private long msgCnt; + + /** + * Default constructor. + */ + public HadoopShuffleFinishRequest() { + // No-op. + } + + /** + * Constructor. + * + * @param jobId Job. + * @param msgCnt Message count. + */ + public HadoopShuffleFinishRequest(HadoopJobId jobId, long msgCnt) { + this.jobId = jobId; + this.msgCnt = msgCnt; + } + + /** + * @return Job ID. + */ + public HadoopJobId jobId() { + return jobId; + } + + /** + * @return Message count. + */ + public long messageCount() { + return msgCnt; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMessage("jobId", jobId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("msgCnt", msgCnt)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + jobId = reader.readMessage("jobId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + msgCnt = reader.readLong("msgCnt"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(HadoopShuffleFinishRequest.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -40; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + jobId.writeExternal(out); + + out.writeLong(msgCnt); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = new HadoopJobId(); + + jobId.readExternal(in); + + msgCnt = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopShuffleFinishRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/58c33805/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishResponse.java new file mode 100644 index 0000000..4b7c93b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishResponse.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle; + +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.nio.ByteBuffer; + +/** + * Shuffle finish response. + */ +public class HadoopShuffleFinishResponse implements Message, HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Job ID. */ + @GridToStringInclude + private HadoopJobId jobId; + + /** + * Default constructor. + */ + public HadoopShuffleFinishResponse() { + // No-op. + } + + /** + * Constructor. + * + * @param jobId Job. + */ + public HadoopShuffleFinishResponse(HadoopJobId jobId) { + this.jobId = jobId; + } + + /** + * @return Job ID. + */ + public HadoopJobId jobId() { + return jobId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMessage("jobId", jobId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + jobId = reader.readMessage("jobId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(HadoopShuffleFinishResponse.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -41; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + jobId.writeExternal(out); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = new HadoopJobId(); + + jobId.readExternal(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopShuffleFinishResponse.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/58c33805/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java index 4450bf2..82bbd32 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java @@ -103,7 +103,7 @@ public class HadoopShuffle extends HadoopComponent { HadoopMapReducePlan plan = ctx.jobTracker().plan(jobId); HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log, - ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId())); + ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()), true); UUID[] rdcAddrs = new UUID[plan.reducers()]; @@ -189,37 +189,34 @@ public class HadoopShuffle extends HadoopComponent { * @return {@code True}. */ public boolean onMessageReceived(UUID src, HadoopMessage msg) { - if (msg instanceof HadoopShuffleMessage) { - HadoopShuffleMessage m = (HadoopShuffleMessage)msg; + try { + if (msg instanceof HadoopShuffleMessage) { + HadoopShuffleMessage m = (HadoopShuffleMessage)msg; - try { - job(m.jobId()).onShuffleMessage(m); - } - catch (IgniteCheckedException e) { - U.error(log, "Message handling failed.", e); + job(m.jobId()).onShuffleMessage(src, m); } + else if (msg instanceof HadoopShuffleAck) { + HadoopShuffleAck m = (HadoopShuffleAck)msg; - try { - // Reply with ack. - send0(src, new HadoopShuffleAck(m.id(), m.jobId())); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to reply back to shuffle message sender [snd=" + src + ", msg=" + msg + ']', e); + job(m.jobId()).onShuffleAck(m); } - } - else if (msg instanceof HadoopShuffleAck) { - HadoopShuffleAck m = (HadoopShuffleAck)msg; + else if (msg instanceof HadoopShuffleFinishRequest) { + HadoopShuffleFinishRequest m = (HadoopShuffleFinishRequest)msg; - try { - job(m.jobId()).onShuffleAck(m); + job(m.jobId()).onShuffleFinishRequest(src, m); } - catch (IgniteCheckedException e) { - U.error(log, "Message handling failed.", e); + else if (msg instanceof HadoopShuffleFinishResponse) { + HadoopShuffleFinishResponse m = (HadoopShuffleFinishResponse)msg; + + job(m.jobId()).onShuffleFinishResponse(src); } + else + throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src + + ", msg=" + msg + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Message handling failed.", e); } - else - throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src + - ", msg=" + msg + ']'); return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/58c33805/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java index 3afb55a..0a3a0ae 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.processors.hadoop.shuffle; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -27,6 +29,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner; import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; @@ -118,9 +121,21 @@ public class HadoopShuffleJob<T> implements AutoCloseable { /** Message size. */ private final int msgSize; + /** Local shuffle states. */ + private volatile HashMap<T, HadoopShuffleLocalState> locShuffleStates = new HashMap<>(); + + /** Remote shuffle states. */ + private volatile HashMap<T, HadoopShuffleRemoteState> rmtShuffleStates = new HashMap<>(); + + /** Mutex for internal synchronization. */ + private final Object mux = new Object(); + /** */ private final long throttle; + /** Embedded mode flag. */ + private final boolean embedded; + /** * @param locReduceAddr Local reducer address. * @param log Logger. @@ -128,15 +143,17 @@ public class HadoopShuffleJob<T> implements AutoCloseable { * @param mem Memory. * @param totalReducerCnt Amount of reducers in the Job. * @param locReducers Reducers will work on current node. + * @param embedded Whether shuffle is running in embedded mode. * @throws IgniteCheckedException If error. */ public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, - int totalReducerCnt, int[] locReducers) throws IgniteCheckedException { + int totalReducerCnt, int[] locReducers, boolean embedded) throws IgniteCheckedException { this.locReduceAddr = locReduceAddr; this.totalReducerCnt = totalReducerCnt; this.job = job; this.mem = mem; this.log = log.getLogger(HadoopShuffleJob.class); + this.embedded = embedded; msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE); @@ -238,10 +255,11 @@ public class HadoopShuffleJob<T> implements AutoCloseable { } /** + * @param src Source. * @param msg Message. * @throws IgniteCheckedException Exception. */ - public void onShuffleMessage(HadoopShuffleMessage msg) throws IgniteCheckedException { + public void onShuffleMessage(T src, HadoopShuffleMessage msg) throws IgniteCheckedException { assert msg.buffer() != null; assert msg.offset() > 0; @@ -276,6 +294,15 @@ public class HadoopShuffleJob<T> implements AutoCloseable { } }); } + + if (embedded) { + // No immediate response. + if (localShuffleState(src).onShuffleMessage()) + sendFinishResponse(src, msg.jobId()); + } + else + // Response for every message. + io.apply(src, new HadoopShuffleAck(msg.id(), msg.jobId())); } /** @@ -292,6 +319,121 @@ public class HadoopShuffleJob<T> implements AutoCloseable { } /** + * Process shuffle finish request. + * + * @param src Source. + * @param msg Shuffle finish message. + */ + public void onShuffleFinishRequest(T src, HadoopShuffleFinishRequest msg) { + if (log.isDebugEnabled()) + log.debug("Received shuffle finish request [jobId=" + job.id() + ", src=" + src + ", req=" + msg + ']'); + + HadoopShuffleLocalState state = localShuffleState(src); + + if (state.onShuffleFinishMessage(msg.messageCount())) + sendFinishResponse(src, msg.jobId()); + } + + /** + * Process shuffle finish response. + * + * @param src Source. + */ + public void onShuffleFinishResponse(T src) { + if (log.isDebugEnabled()) + log.debug("Received shuffle finish response [jobId=" + job.id() + ", src=" + src + ']'); + + remoteShuffleState(src).onShuffleFinishResponse(); + } + + /** + * Send finish response. + * + * @param dest Destination. + * @param jobId Job ID. + */ + @SuppressWarnings("unchecked") + private void sendFinishResponse(T dest, HadoopJobId jobId) { + if (log.isDebugEnabled()) + log.debug("Sent shuffle finish response [jobId=" + jobId + ", dest=" + dest + ']'); + + HadoopShuffleFinishResponse msg = new HadoopShuffleFinishResponse(jobId); + + io.apply(dest, msg); + } + + /** + * Get local shuffle state for node. + * + * @param src Source + * @return Local shuffle state. + */ + private HadoopShuffleLocalState localShuffleState(T src) { + HashMap<T, HadoopShuffleLocalState> states = locShuffleStates; + + HadoopShuffleLocalState res = states.get(src); + + if (res == null) { + synchronized (mux) { + res = locShuffleStates.get(src); + + if (res == null) { + res = new HadoopShuffleLocalState(); + + states = new HashMap<>(locShuffleStates); + + states.put(src, res); + + locShuffleStates = states; + } + } + } + + return res; + } + + /** + * Get remote shuffle state for node. + * + * @param src Source. + * @return Remote shuffle state. + */ + private HadoopShuffleRemoteState remoteShuffleState(T src) { + HashMap<T, HadoopShuffleRemoteState> states = rmtShuffleStates; + + HadoopShuffleRemoteState res = states.get(src); + + if (res == null) { + synchronized (mux) { + res = rmtShuffleStates.get(src); + + if (res == null) { + res = new HadoopShuffleRemoteState(); + + states = new HashMap<>(rmtShuffleStates); + + states.put(src, res); + + rmtShuffleStates = states; + } + } + } + + return res; + } + + /** + * Get all remote shuffle states. + * + * @return Remote shuffle states. + */ + private HashMap<T, HadoopShuffleRemoteState> remoteShuffleStates() { + synchronized (mux) { + return new HashMap<>(rmtShuffleStates); + } + } + + /** * Unsafe value. */ private static class UnsafeValue implements HadoopMultimap.Value { @@ -406,38 +548,50 @@ public class HadoopShuffleJob<T> implements AutoCloseable { * @param newBufMinSize Min new buffer size. */ private void send(final int idx, int newBufMinSize) { - final GridFutureAdapter<?> fut = new GridFutureAdapter<>(); - HadoopShuffleMessage msg = msgs[idx]; final long msgId = msg.id(); - IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> old = sentMsgs.putIfAbsent(msgId, - new IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>(msg, fut)); + final GridFutureAdapter<?> fut; + + if (embedded) + fut = null; + else { + fut = new GridFutureAdapter<>(); + + IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> old = sentMsgs.putIfAbsent(msgId, + new IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>(msg, fut)); - assert old == null; + assert old == null; + } try { io.apply(reduceAddrs[idx], msg); + + if (embedded) + remoteShuffleState(reduceAddrs[idx]).onShuffleMessage(); } catch (GridClosureException e) { - fut.onDone(U.unwrap(e)); + if (fut != null) + fut.onDone(U.unwrap(e)); } - fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - try { - f.get(); + if (fut != null) { + fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + try { + f.get(); - // Clean up the future from map only if there was no exception. - // Otherwise flush() should fail. - sentMsgs.remove(msgId); - } - catch (IgniteCheckedException e) { - log.error("Failed to send message.", e); + // Clean up the future from map only if there was no exception. + // Otherwise flush() should fail. + sentMsgs.remove(msgId); + } + catch (IgniteCheckedException e) { + log.error("Failed to send message.", e); + } } - } - }); + }); + } msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx, Math.max(msgSize, newBufMinSize)); @@ -513,14 +667,41 @@ public class HadoopShuffleJob<T> implements AutoCloseable { GridCompoundFuture fut = new GridCompoundFuture<>(); - for (IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> tup : sentMsgs.values()) - fut.add(tup.get2()); + if (embedded) { + boolean sent = false; - fut.markInitialized(); + for (Map.Entry<T, HadoopShuffleRemoteState> rmtStateEntry : remoteShuffleStates().entrySet()) { + T dest = rmtStateEntry.getKey(); + HadoopShuffleRemoteState rmtState = rmtStateEntry.getValue(); - if (log.isDebugEnabled()) - log.debug("Collected futures to compound futures for flush: " + sentMsgs.size()); + HadoopShuffleFinishRequest req = new HadoopShuffleFinishRequest(job.id(), rmtState.messageCount()); + + io.apply(dest, req); + if (log.isDebugEnabled()) + log.debug("Sent shuffle finish request [jobId=" + job.id() + ", dest=" + dest + + ", req=" + req + ']'); + + fut.add(rmtState.future()); + + sent = true; + } + + if (sent) + fut.markInitialized(); + else + return new GridFinishedFuture<>(); + } + else { + for (IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> tup : sentMsgs.values()) + fut.add(tup.get2()); + + fut.markInitialized(); + + if (log.isDebugEnabled()) + log.debug("Collected futures to compound futures for flush: " + sentMsgs.size()); + + } return fut; } http://git-wip-us.apache.org/repos/asf/ignite/blob/58c33805/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleLocalState.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleLocalState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleLocalState.java new file mode 100644 index 0000000..68c0653 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleLocalState.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Local shuffle state. + */ +class HadoopShuffleLocalState { + /** Message counter. */ + private final AtomicLong msgCnt = new AtomicLong(); + + /** Reply guard. */ + private final AtomicBoolean replyGuard = new AtomicBoolean(); + + /** Total message count.*/ + private volatile long totalMsgCnt; + + /** + * Callback invoked when shuffle message arrived. + * + * @return Whether to perform reply. + */ + public boolean onShuffleMessage() { + long msgCnt0 = msgCnt.incrementAndGet(); + + return msgCnt0 == totalMsgCnt && reserve(); + } + + /** + * Callback invoked when shuffle is finished. + * + * @param totalMsgCnt Message count. + * @return Whether to perform reply. + */ + public boolean onShuffleFinishMessage(long totalMsgCnt) { + this.totalMsgCnt = totalMsgCnt; + + return msgCnt.get() == totalMsgCnt && reserve(); + } + + /** + * Reserve reply. + * + * @return {@code True} if reserved. + */ + private boolean reserve() { + return replyGuard.compareAndSet(false, true); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/58c33805/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java new file mode 100644 index 0000000..5ffaa55 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle; + +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.lang.IgniteInClosure; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Remote shuffle state. + */ +class HadoopShuffleRemoteState<T> { + /** Message count. */ + private final AtomicLong msgCnt = new AtomicLong(); + + /** Completion future. */ + private final GridFutureAdapter fut = new GridFutureAdapter(); + + /** + * Callback invoked when shuffle message is sent. + */ + public void onShuffleMessage() { + msgCnt.incrementAndGet(); + } + + /** + * Callback invoked on shuffle finish response. + */ + public void onShuffleFinishResponse() { + fut.onDone(); + } + + /** + * @return Message count. + */ + public long messageCount() { + return msgCnt.get(); + } + + /** + * @return Completion future. + */ + public GridFutureAdapter future() { + return fut; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/58c33805/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java index 7001b8c..cb08c00 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java @@ -151,7 +151,7 @@ public class HadoopChildProcessRunner { job.initialize(true, nodeDesc.processId()); shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem, - req.totalReducerCount(), req.localReducers()); + req.totalReducerCount(), req.localReducers(), false); initializeExecutors(req); @@ -432,9 +432,7 @@ public class HadoopChildProcessRunner { try { HadoopShuffleMessage m = (HadoopShuffleMessage)msg; - shuffleJob.onShuffleMessage(m); - - comm.sendMessage(desc, new HadoopShuffleAck(m.id(), m.jobId())); + shuffleJob.onShuffleMessage(desc, m); } catch (IgniteCheckedException e) { U.error(log, "Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']', e);
