http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
deleted file mode 100644
index 3eb819b..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.compute.ComputeJobContext;
-import org.apache.ignite.internal.processors.hadoop.Hadoop;
-import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
-
-import static 
org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_CANCELLING;
-
-/**
- * Submit job task.
- */
-public class HadoopProtocolSubmitJobTask extends 
HadoopProtocolTaskAdapter<HadoopJobStatus> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** {@inheritDoc} */
-    @Override public HadoopJobStatus run(ComputeJobContext jobCtx, Hadoop 
hadoop,
-        HadoopProtocolTaskArguments args) throws IgniteCheckedException {
-        UUID nodeId = UUID.fromString(args.<String>get(0));
-        Integer id = args.get(1);
-        HadoopDefaultJobInfo info = args.get(2);
-
-        assert nodeId != null;
-        assert id != null;
-        assert info != null;
-
-        HadoopJobId jobId = new HadoopJobId(nodeId, id);
-
-        hadoop.submit(jobId, info);
-
-        HadoopJobStatus res = hadoop.status(jobId);
-
-        if (res == null) // Submission failed.
-            res = new HadoopJobStatus(jobId, info.jobName(), info.user(), 0, 
0, 0, 0, PHASE_CANCELLING, true, 1);
-
-        return res;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
deleted file mode 100644
index c3227ae..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeJobContext;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.compute.ComputeJobResultPolicy;
-import org.apache.ignite.compute.ComputeTask;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.hadoop.Hadoop;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.JobContextResource;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Hadoop protocol task adapter.
- */
-public abstract class HadoopProtocolTaskAdapter<R> implements 
ComputeTask<HadoopProtocolTaskArguments, R> {
-    /** {@inheritDoc} */
-    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid,
-        @Nullable HadoopProtocolTaskArguments arg) {
-        return Collections.singletonMap(new Job(arg), subgrid.get(0));
-    }
-
-    /** {@inheritDoc} */
-    @Override public ComputeJobResultPolicy result(ComputeJobResult res, 
List<ComputeJobResult> rcvd) {
-        return ComputeJobResultPolicy.REDUCE;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public R reduce(List<ComputeJobResult> results) {
-        if (!F.isEmpty(results)) {
-            ComputeJobResult res = results.get(0);
-
-            return res.getData();
-        }
-        else
-            return null;
-    }
-
-    /**
-     * Job wrapper.
-     */
-    private class Job implements ComputeJob {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
-        /** */
-        @SuppressWarnings("UnusedDeclaration")
-        @JobContextResource
-        private ComputeJobContext jobCtx;
-
-        /** Argument. */
-        private final HadoopProtocolTaskArguments args;
-
-        /**
-         * Constructor.
-         *
-         * @param args Job argument.
-         */
-        private Job(HadoopProtocolTaskArguments args) {
-            this.args = args;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void cancel() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Object execute() {
-            try {
-                return run(jobCtx, ((IgniteEx)ignite).hadoop(), args);
-            }
-            catch (IgniteCheckedException e) {
-                throw U.convertException(e);
-            }
-        }
-    }
-
-    /**
-     * Run the task.
-     *
-     * @param jobCtx Job context.
-     * @param hadoop Hadoop facade.
-     * @param args Arguments.
-     * @return Job result.
-     * @throws IgniteCheckedException If failed.
-     */
-    public abstract R run(ComputeJobContext jobCtx, Hadoop hadoop, 
HadoopProtocolTaskArguments args)
-        throws IgniteCheckedException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java
deleted file mode 100644
index e497454..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Task arguments.
- */
-public class HadoopProtocolTaskArguments implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Arguments. */
-    private Object[] args;
-
-    /**
-     * {@link Externalizable} support.
-     */
-    public HadoopProtocolTaskArguments() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param args Arguments.
-     */
-    public HadoopProtocolTaskArguments(Object... args) {
-        this.args = args;
-    }
-
-    /**
-     * @param idx Argument index.
-     * @return Argument.
-     */
-    @SuppressWarnings("unchecked")
-    @Nullable public <T> T get(int idx) {
-        return (args != null && args.length > idx) ? (T)args[idx] : null;
-    }
-
-    /**
-     * @return Size.
-     */
-    public int size() {
-        return args != null ? args.length : 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeArray(out, args);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-        args = U.readArray(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopProtocolTaskArguments.class, this);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
deleted file mode 100644
index 769bdc4..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle;
-
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.GridTopic;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
-import org.apache.ignite.internal.processors.hadoop.HadoopContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiPredicate;
-
-/**
- * Shuffle.
- */
-public class HadoopShuffle extends HadoopComponent {
-    /** */
-    private final ConcurrentMap<HadoopJobId, HadoopShuffleJob<UUID>> jobs = 
new ConcurrentHashMap<>();
-
-    /** */
-    protected final GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
-    /** {@inheritDoc} */
-    @Override public void start(HadoopContext ctx) throws 
IgniteCheckedException {
-        super.start(ctx);
-
-        ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP,
-            new IgniteBiPredicate<UUID, Object>() {
-                @Override public boolean apply(UUID nodeId, Object msg) {
-                    return onMessageReceived(nodeId, (HadoopMessage)msg);
-                }
-            });
-    }
-
-    /**
-     * Stops shuffle.
-     *
-     * @param cancel If should cancel all ongoing activities.
-     */
-    @Override public void stop(boolean cancel) {
-        for (HadoopShuffleJob job : jobs.values()) {
-            try {
-                job.close();
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to close job.", e);
-            }
-        }
-
-        jobs.clear();
-    }
-
-    /**
-     * Creates new shuffle job.
-     *
-     * @param jobId Job ID.
-     * @return Created shuffle job.
-     * @throws IgniteCheckedException If job creation failed.
-     */
-    private HadoopShuffleJob<UUID> newJob(HadoopJobId jobId) throws 
IgniteCheckedException {
-        HadoopMapReducePlan plan = ctx.jobTracker().plan(jobId);
-
-        HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), 
log,
-            ctx.jobTracker().job(jobId, null), mem, plan.reducers(), 
plan.reducers(ctx.localNodeId()));
-
-        UUID[] rdcAddrs = new UUID[plan.reducers()];
-
-        for (int i = 0; i < rdcAddrs.length; i++) {
-            UUID nodeId = plan.nodeForReducer(i);
-
-            assert nodeId != null : "Plan is missing node for reducer [plan=" 
+ plan + ", rdc=" + i + ']';
-
-            rdcAddrs[i] = nodeId;
-        }
-
-        boolean init = job.initializeReduceAddresses(rdcAddrs);
-
-        assert init;
-
-        return job;
-    }
-
-    /**
-     * @param nodeId Node ID to send message to.
-     * @param msg Message to send.
-     * @throws IgniteCheckedException If send failed.
-     */
-    private void send0(UUID nodeId, Object msg) throws IgniteCheckedException {
-        ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
-
-        ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, 
GridTopic.TOPIC_HADOOP, false, 0);
-    }
-
-    /**
-     * @param jobId Task info.
-     * @return Shuffle job.
-     */
-    private HadoopShuffleJob<UUID> job(HadoopJobId jobId) throws 
IgniteCheckedException {
-        HadoopShuffleJob<UUID> res = jobs.get(jobId);
-
-        if (res == null) {
-            res = newJob(jobId);
-
-            HadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res);
-
-            if (old != null) {
-                res.close();
-
-                res = old;
-            }
-            else if (res.reducersInitialized())
-                startSending(res);
-        }
-
-        return res;
-    }
-
-    /**
-     * Starts message sending thread.
-     *
-     * @param shuffleJob Job to start sending for.
-     */
-    private void startSending(HadoopShuffleJob<UUID> shuffleJob) {
-        shuffleJob.startSending(ctx.kernalContext().gridName(),
-            new IgniteInClosure2X<UUID, HadoopShuffleMessage>() {
-                @Override public void applyx(UUID dest, HadoopShuffleMessage 
msg) throws IgniteCheckedException {
-                    send0(dest, msg);
-                }
-            }
-        );
-    }
-
-    /**
-     * Message received callback.
-     *
-     * @param src Sender node ID.
-     * @param msg Received message.
-     * @return {@code True}.
-     */
-    public boolean onMessageReceived(UUID src, HadoopMessage msg) {
-        if (msg instanceof HadoopShuffleMessage) {
-            HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
-
-            try {
-                job(m.jobId()).onShuffleMessage(m);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Message handling failed.", e);
-            }
-
-            try {
-                // Reply with ack.
-                send0(src, new HadoopShuffleAck(m.id(), m.jobId()));
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to reply back to shuffle message sender 
[snd=" + src + ", msg=" + msg + ']', e);
-            }
-        }
-        else if (msg instanceof HadoopShuffleAck) {
-            HadoopShuffleAck m = (HadoopShuffleAck)msg;
-
-            try {
-                job(m.jobId()).onShuffleAck(m);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Message handling failed.", e);
-            }
-        }
-        else
-            throw new IllegalStateException("Unknown message type received to 
Hadoop shuffle [src=" + src +
-                ", msg=" + msg + ']');
-
-        return true;
-    }
-
-    /**
-     * @param taskCtx Task info.
-     * @return Output.
-     */
-    public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws 
IgniteCheckedException {
-        return job(taskCtx.taskInfo().jobId()).output(taskCtx);
-    }
-
-    /**
-     * @param taskCtx Task info.
-     * @return Input.
-     */
-    public HadoopTaskInput input(HadoopTaskContext taskCtx) throws 
IgniteCheckedException {
-        return job(taskCtx.taskInfo().jobId()).input(taskCtx);
-    }
-
-    /**
-     * @param jobId Job id.
-     */
-    public void jobFinished(HadoopJobId jobId) {
-        HadoopShuffleJob job = jobs.remove(jobId);
-
-        if (job != null) {
-            try {
-                job.close();
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to close job: " + jobId, e);
-            }
-        }
-    }
-
-    /**
-     * Flushes all the outputs for the given job to remote nodes.
-     *
-     * @param jobId Job ID.
-     * @return Future.
-     */
-    public IgniteInternalFuture<?> flush(HadoopJobId jobId) {
-        HadoopShuffleJob job = jobs.get(jobId);
-
-        if (job == null)
-            return new GridFinishedFuture<>();
-
-        try {
-            return job.flush();
-        }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<>(e);
-        }
-    }
-
-    /**
-     * @return Memory.
-     */
-    public GridUnsafeMemory memory() {
-        return mem;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
deleted file mode 100644
index 6013ec6..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Acknowledgement message.
- */
-public class HadoopShuffleAck implements HadoopMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    @GridToStringInclude
-    private long msgId;
-
-    /** */
-    @GridToStringInclude
-    private HadoopJobId jobId;
-
-    /**
-     *
-     */
-    public HadoopShuffleAck() {
-        // No-op.
-    }
-
-    /**
-     * @param msgId Message ID.
-     */
-    public HadoopShuffleAck(long msgId, HadoopJobId jobId) {
-        assert jobId != null;
-
-        this.msgId = msgId;
-        this.jobId = jobId;
-    }
-
-    /**
-     * @return Message ID.
-     */
-    public long id() {
-        return msgId;
-    }
-
-    /**
-     * @return Job ID.
-     */
-    public HadoopJobId jobId() {
-        return jobId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        jobId.writeExternal(out);
-        out.writeLong(msgId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-        jobId = new HadoopJobId();
-
-        jobId.readExternal(in);
-        msgId = in.readLong();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopShuffleAck.class, this);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
deleted file mode 100644
index b940c72..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ /dev/null
@@ -1,612 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReferenceArray;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
-import 
org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
-import 
org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimap;
-import 
org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap;
-import 
org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.io.GridUnsafeDataInput;
-import org.apache.ignite.internal.util.lang.GridClosureException;
-import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.thread.IgniteThread;
-
-import static 
org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE;
-import static 
org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING;
-import static 
org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
-
-/**
- * Shuffle job.
- */
-public class HadoopShuffleJob<T> implements AutoCloseable {
-    /** */
-    private static final int MSG_BUF_SIZE = 128 * 1024;
-
-    /** */
-    private final HadoopJob job;
-
-    /** */
-    private final GridUnsafeMemory mem;
-
-    /** */
-    private final boolean needPartitioner;
-
-    /** Collection of task contexts for each reduce task. */
-    private final Map<Integer, HadoopTaskContext> reducersCtx = new 
HashMap<>();
-
-    /** Reducers addresses. */
-    private T[] reduceAddrs;
-
-    /** Local reducers address. */
-    private final T locReduceAddr;
-
-    /** */
-    private final HadoopShuffleMessage[] msgs;
-
-    /** */
-    private final AtomicReferenceArray<HadoopMultimap> maps;
-
-    /** */
-    private volatile IgniteInClosure2X<T, HadoopShuffleMessage> io;
-
-    /** */
-    protected ConcurrentMap<Long, IgniteBiTuple<HadoopShuffleMessage, 
GridFutureAdapter<?>>> sentMsgs =
-        new ConcurrentHashMap<>();
-
-    /** */
-    private volatile GridWorker snd;
-
-    /** Latch for remote addresses waiting. */
-    private final CountDownLatch ioInitLatch = new CountDownLatch(1);
-
-    /** Finished flag. Set on flush or close. */
-    private volatile boolean flushed;
-
-    /** */
-    private final IgniteLogger log;
-
-    /**
-     * @param locReduceAddr Local reducer address.
-     * @param log Logger.
-     * @param job Job.
-     * @param mem Memory.
-     * @param totalReducerCnt Amount of reducers in the Job.
-     * @param locReducers Reducers will work on current node.
-     * @throws IgniteCheckedException If error.
-     */
-    public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, 
GridUnsafeMemory mem,
-        int totalReducerCnt, int[] locReducers) throws IgniteCheckedException {
-        this.locReduceAddr = locReduceAddr;
-        this.job = job;
-        this.mem = mem;
-        this.log = log.getLogger(HadoopShuffleJob.class);
-
-        if (!F.isEmpty(locReducers)) {
-            for (int rdc : locReducers) {
-                HadoopTaskInfo taskInfo = new 
HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null);
-
-                reducersCtx.put(rdc, job.getTaskContext(taskInfo));
-            }
-        }
-
-        needPartitioner = totalReducerCnt > 1;
-
-        maps = new AtomicReferenceArray<>(totalReducerCnt);
-        msgs = new HadoopShuffleMessage[totalReducerCnt];
-    }
-
-    /**
-     * @param reduceAddrs Addresses of reducers.
-     * @return {@code True} if addresses were initialized by this call.
-     */
-    public boolean initializeReduceAddresses(T[] reduceAddrs) {
-        if (this.reduceAddrs == null) {
-            this.reduceAddrs = reduceAddrs;
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * @return {@code True} if reducers addresses were initialized.
-     */
-    public boolean reducersInitialized() {
-        return reduceAddrs != null;
-    }
-
-    /**
-     * @param gridName Grid name.
-     * @param io IO Closure for sending messages.
-     */
-    @SuppressWarnings("BusyWait")
-    public void startSending(String gridName, IgniteInClosure2X<T, 
HadoopShuffleMessage> io) {
-        assert snd == null;
-        assert io != null;
-
-        this.io = io;
-
-        if (!flushed) {
-            snd = new GridWorker(gridName, "hadoop-shuffle-" + job.id(), log) {
-                @Override protected void body() throws InterruptedException {
-                    try {
-                        while (!isCancelled()) {
-                            Thread.sleep(5);
-
-                            collectUpdatesAndSend(false);
-                        }
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new IllegalStateException(e);
-                    }
-                }
-            };
-
-            new IgniteThread(snd).start();
-        }
-
-        ioInitLatch.countDown();
-    }
-
-    /**
-     * @param maps Maps.
-     * @param idx Index.
-     * @return Map.
-     */
-    private HadoopMultimap getOrCreateMap(AtomicReferenceArray<HadoopMultimap> 
maps, int idx) {
-        HadoopMultimap map = maps.get(idx);
-
-        if (map == null) { // Create new map.
-            map = get(job.info(), SHUFFLE_REDUCER_NO_SORTING, false) ?
-                new HadoopConcurrentHashMultimap(job.info(), mem, 
get(job.info(), PARTITION_HASHMAP_SIZE, 8 * 1024)):
-                new HadoopSkipList(job.info(), mem);
-
-            if (!maps.compareAndSet(idx, null, map)) {
-                map.close();
-
-                return maps.get(idx);
-            }
-        }
-
-        return map;
-    }
-
-    /**
-     * @param msg Message.
-     * @throws IgniteCheckedException Exception.
-     */
-    public void onShuffleMessage(HadoopShuffleMessage msg) throws 
IgniteCheckedException {
-        assert msg.buffer() != null;
-        assert msg.offset() > 0;
-
-        HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer());
-
-        HadoopPerformanceCounter perfCntr = 
HadoopPerformanceCounter.getCounter(taskCtx.counters(), null);
-
-        perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis());
-
-        HadoopMultimap map = getOrCreateMap(maps, msg.reducer());
-
-        // Add data from message to the map.
-        try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) {
-            final GridUnsafeDataInput dataInput = new GridUnsafeDataInput();
-            final UnsafeValue val = new UnsafeValue(msg.buffer());
-
-            msg.visit(new HadoopShuffleMessage.Visitor() {
-                /** */
-                private HadoopMultimap.Key key;
-
-                @Override public void onKey(byte[] buf, int off, int len) 
throws IgniteCheckedException {
-                    dataInput.bytes(buf, off, off + len);
-
-                    key = adder.addKey(dataInput, key);
-                }
-
-                @Override public void onValue(byte[] buf, int off, int len) {
-                    val.off = off;
-                    val.size = len;
-
-                    key.add(val);
-                }
-            });
-        }
-    }
-
-    /**
-     * @param ack Shuffle ack.
-     */
-    @SuppressWarnings("ConstantConditions")
-    public void onShuffleAck(HadoopShuffleAck ack) {
-        IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> tup = 
sentMsgs.get(ack.id());
-
-        if (tup != null)
-            tup.get2().onDone();
-        else
-            log.warning("Received shuffle ack for not registered shuffle id: " 
+ ack);
-    }
-
-    /**
-     * Unsafe value.
-     */
-    private static class UnsafeValue implements HadoopMultimap.Value {
-        /** */
-        private final byte[] buf;
-
-        /** */
-        private int off;
-
-        /** */
-        private int size;
-
-        /**
-         * @param buf Buffer.
-         */
-        private UnsafeValue(byte[] buf) {
-            assert buf != null;
-
-            this.buf = buf;
-        }
-
-        /** */
-        @Override public int size() {
-            return size;
-        }
-
-        /** */
-        @Override public void copyTo(long ptr) {
-            GridUnsafe.copyMemory(buf, GridUnsafe.BYTE_ARR_OFF + off, null, 
ptr, size);
-        }
-    }
-
-    /**
-     * Sends map updates to remote reducers.
-     */
-    private void collectUpdatesAndSend(boolean flush) throws 
IgniteCheckedException {
-        for (int i = 0; i < maps.length(); i++) {
-            HadoopMultimap map = maps.get(i);
-
-            if (map == null || locReduceAddr.equals(reduceAddrs[i]))
-                continue; // Skip empty map and local node.
-
-            if (msgs[i] == null)
-                msgs[i] = new HadoopShuffleMessage(job.id(), i, MSG_BUF_SIZE);
-
-            final int idx = i;
-
-            map.visit(false, new HadoopMultimap.Visitor() {
-                /** */
-                private long keyPtr;
-
-                /** */
-                private int keySize;
-
-                /** */
-                private boolean keyAdded;
-
-                /** {@inheritDoc} */
-                @Override public void onKey(long keyPtr, int keySize) {
-                    this.keyPtr = keyPtr;
-                    this.keySize = keySize;
-
-                    keyAdded = false;
-                }
-
-                private boolean tryAdd(long valPtr, int valSize) {
-                    HadoopShuffleMessage msg = msgs[idx];
-
-                    if (!keyAdded) { // Add key and value.
-                        int size = keySize + valSize;
-
-                        if (!msg.available(size, false))
-                            return false;
-
-                        msg.addKey(keyPtr, keySize);
-                        msg.addValue(valPtr, valSize);
-
-                        keyAdded = true;
-
-                        return true;
-                    }
-
-                    if (!msg.available(valSize, true))
-                        return false;
-
-                    msg.addValue(valPtr, valSize);
-
-                    return true;
-                }
-
-                /** {@inheritDoc} */
-                @Override public void onValue(long valPtr, int valSize) {
-                    if (tryAdd(valPtr, valSize))
-                        return;
-
-                    send(idx, keySize + valSize);
-
-                    keyAdded = false;
-
-                    if (!tryAdd(valPtr, valSize))
-                        throw new IllegalStateException();
-                }
-            });
-
-            if (flush && msgs[i].offset() != 0)
-                send(i, 0);
-        }
-    }
-
-    /**
-     * @param idx Index of message.
-     * @param newBufMinSize Min new buffer size.
-     */
-    private void send(final int idx, int newBufMinSize) {
-        final GridFutureAdapter<?> fut = new GridFutureAdapter<>();
-
-        HadoopShuffleMessage msg = msgs[idx];
-
-        final long msgId = msg.id();
-
-        IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> old = 
sentMsgs.putIfAbsent(msgId,
-            new IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>(msg, 
fut));
-
-        assert old == null;
-
-        try {
-            io.apply(reduceAddrs[idx], msg);
-        }
-        catch (GridClosureException e) {
-            fut.onDone(U.unwrap(e));
-        }
-
-        fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
-            @Override public void apply(IgniteInternalFuture<?> f) {
-                try {
-                    f.get();
-
-                    // Clean up the future from map only if there was no 
exception.
-                    // Otherwise flush() should fail.
-                    sentMsgs.remove(msgId);
-                }
-                catch (IgniteCheckedException e) {
-                    log.error("Failed to send message.", e);
-                }
-            }
-        });
-
-        msgs[idx] = newBufMinSize == 0 ? null : new 
HadoopShuffleMessage(job.id(), idx,
-            Math.max(MSG_BUF_SIZE, newBufMinSize));
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() throws IgniteCheckedException {
-        if (snd != null) {
-            snd.cancel();
-
-            try {
-                snd.join();
-            }
-            catch (InterruptedException e) {
-                throw new IgniteInterruptedCheckedException(e);
-            }
-        }
-
-        close(maps);
-    }
-
-    /**
-     * @param maps Maps.
-     */
-    private void close(AtomicReferenceArray<HadoopMultimap> maps) {
-        for (int i = 0; i < maps.length(); i++) {
-            HadoopMultimap map = maps.get(i);
-
-            if (map != null)
-                map.close();
-        }
-    }
-
-    /**
-     * @return Future.
-     */
-    @SuppressWarnings("unchecked")
-    public IgniteInternalFuture<?> flush() throws IgniteCheckedException {
-        if (log.isDebugEnabled())
-            log.debug("Flushing job " + job.id() + " on address " + 
locReduceAddr);
-
-        flushed = true;
-
-        if (maps.length() == 0)
-            return new GridFinishedFuture<>();
-
-        U.await(ioInitLatch);
-
-        GridWorker snd0 = snd;
-
-        if (snd0 != null) {
-            if (log.isDebugEnabled())
-                log.debug("Cancelling sender thread.");
-
-            snd0.cancel();
-
-            try {
-                snd0.join();
-
-                if (log.isDebugEnabled())
-                    log.debug("Finished waiting for sending thread to complete 
on shuffle job flush: " + job.id());
-            }
-            catch (InterruptedException e) {
-                throw new IgniteInterruptedCheckedException(e);
-            }
-        }
-
-        collectUpdatesAndSend(true); // With flush.
-
-        if (log.isDebugEnabled())
-            log.debug("Finished sending collected updates to remote reducers: 
" + job.id());
-
-        GridCompoundFuture fut = new GridCompoundFuture<>();
-
-        for (IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> tup : 
sentMsgs.values())
-            fut.add(tup.get2());
-
-        fut.markInitialized();
-
-        if (log.isDebugEnabled())
-            log.debug("Collected futures to compound futures for flush: " + 
sentMsgs.size());
-
-        return fut;
-    }
-
-    /**
-     * @param taskCtx Task context.
-     * @return Output.
-     * @throws IgniteCheckedException If failed.
-     */
-    public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws 
IgniteCheckedException {
-        switch (taskCtx.taskInfo().type()) {
-            case MAP:
-                assert !job.info().hasCombiner() : "The output creation is 
allowed if combiner has not been defined.";
-
-            case COMBINE:
-                return new PartitionedOutput(taskCtx);
-
-            default:
-                throw new IllegalStateException("Illegal type: " + 
taskCtx.taskInfo().type());
-        }
-    }
-
-    /**
-     * @param taskCtx Task context.
-     * @return Input.
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings("unchecked")
-    public HadoopTaskInput input(HadoopTaskContext taskCtx) throws 
IgniteCheckedException {
-        switch (taskCtx.taskInfo().type()) {
-            case REDUCE:
-                int reducer = taskCtx.taskInfo().taskNumber();
-
-                HadoopMultimap m = maps.get(reducer);
-
-                if (m != null)
-                    return m.input(taskCtx);
-
-                return new HadoopTaskInput() { // Empty input.
-                    @Override public boolean next() {
-                        return false;
-                    }
-
-                    @Override public Object key() {
-                        throw new IllegalStateException();
-                    }
-
-                    @Override public Iterator<?> values() {
-                        throw new IllegalStateException();
-                    }
-
-                    @Override public void close() {
-                        // No-op.
-                    }
-                };
-
-            default:
-                throw new IllegalStateException("Illegal type: " + 
taskCtx.taskInfo().type());
-        }
-    }
-
-    /**
-     * Partitioned output.
-     */
-    private class PartitionedOutput implements HadoopTaskOutput {
-        /** */
-        private final HadoopTaskOutput[] adders = new 
HadoopTaskOutput[maps.length()];
-
-        /** */
-        private HadoopPartitioner partitioner;
-
-        /** */
-        private final HadoopTaskContext taskCtx;
-
-        /**
-         * Constructor.
-         * @param taskCtx Task context.
-         */
-        private PartitionedOutput(HadoopTaskContext taskCtx) throws 
IgniteCheckedException {
-            this.taskCtx = taskCtx;
-
-            if (needPartitioner)
-                partitioner = taskCtx.partitioner();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(Object key, Object val) throws 
IgniteCheckedException {
-            int part = 0;
-
-            if (partitioner != null) {
-                part = partitioner.partition(key, val, adders.length);
-
-                if (part < 0 || part >= adders.length)
-                    throw new IgniteCheckedException("Invalid partition: " + 
part);
-            }
-
-            HadoopTaskOutput out = adders[part];
-
-            if (out == null)
-                adders[part] = out = getOrCreateMap(maps, 
part).startAdding(taskCtx);
-
-            out.write(key, val);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IgniteCheckedException {
-            for (HadoopTaskOutput adder : adders) {
-                if (adder != null)
-                    adder.close();
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
deleted file mode 100644
index 69dfe64..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Shuffle message.
- */
-public class HadoopShuffleMessage implements HadoopMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private static final AtomicLong ids = new AtomicLong();
-
-    /** */
-    private static final byte MARKER_KEY = (byte)17;
-
-    /** */
-    private static final byte MARKER_VALUE = (byte)31;
-
-    /** */
-    @GridToStringInclude
-    private long msgId;
-
-    /** */
-    @GridToStringInclude
-    private HadoopJobId jobId;
-
-    /** */
-    @GridToStringInclude
-    private int reducer;
-
-    /** */
-    private byte[] buf;
-
-    /** */
-    @GridToStringInclude
-    private int off;
-
-    /**
-     *
-     */
-    public HadoopShuffleMessage() {
-        // No-op.
-    }
-
-    /**
-     * @param size Size.
-     */
-    public HadoopShuffleMessage(HadoopJobId jobId, int reducer, int size) {
-        assert jobId != null;
-
-        buf = new byte[size];
-
-        this.jobId = jobId;
-        this.reducer = reducer;
-
-        msgId = ids.incrementAndGet();
-    }
-
-    /**
-     * @return Message ID.
-     */
-    public long id() {
-        return msgId;
-    }
-
-    /**
-     * @return Job ID.
-     */
-    public HadoopJobId jobId() {
-        return jobId;
-    }
-
-    /**
-     * @return Reducer.
-     */
-    public int reducer() {
-        return reducer;
-    }
-
-    /**
-     * @return Buffer.
-     */
-    public byte[] buffer() {
-        return buf;
-    }
-
-    /**
-     * @return Offset.
-     */
-    public int offset() {
-        return off;
-    }
-
-    /**
-     * @param size Size.
-     * @param valOnly Only value wll be added.
-     * @return {@code true} If this message can fit additional data of this 
size
-     */
-    public boolean available(int size, boolean valOnly) {
-        size += valOnly ? 5 : 10;
-
-        if (off + size > buf.length) {
-            if (off == 0) { // Resize if requested size is too big.
-                buf = new byte[size];
-
-                return true;
-            }
-
-            return false;
-        }
-
-        return true;
-    }
-
-    /**
-     * @param keyPtr Key pointer.
-     * @param keySize Key size.
-     */
-    public void addKey(long keyPtr, int keySize) {
-        add(MARKER_KEY, keyPtr, keySize);
-    }
-
-    /**
-     * @param valPtr Value pointer.
-     * @param valSize Value size.
-     */
-    public void addValue(long valPtr, int valSize) {
-        add(MARKER_VALUE, valPtr, valSize);
-    }
-
-    /**
-     * @param marker Marker.
-     * @param ptr Pointer.
-     * @param size Size.
-     */
-    private void add(byte marker, long ptr, int size) {
-        buf[off++] = marker;
-
-        GridUnsafe.putInt(buf, GridUnsafe.BYTE_ARR_OFF + off, size);
-
-        off += 4;
-
-        GridUnsafe.copyMemory(null, ptr, buf, GridUnsafe.BYTE_ARR_OFF + off, 
size);
-
-        off += size;
-    }
-
-    /**
-     * @param v Visitor.
-     */
-    public void visit(Visitor v) throws IgniteCheckedException {
-        for (int i = 0; i < off;) {
-            byte marker = buf[i++];
-
-            int size = GridUnsafe.getInt(buf, GridUnsafe.BYTE_ARR_OFF + i);
-
-            i += 4;
-
-            if (marker == MARKER_VALUE)
-                v.onValue(buf, i, size);
-            else if (marker == MARKER_KEY)
-                v.onKey(buf, i, size);
-            else
-                throw new IllegalStateException();
-
-            i += size;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        jobId.writeExternal(out);
-        out.writeLong(msgId);
-        out.writeInt(reducer);
-        out.writeInt(off);
-        U.writeByteArray(out, buf);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-        jobId = new HadoopJobId();
-
-        jobId.readExternal(in);
-        msgId = in.readLong();
-        reducer = in.readInt();
-        off = in.readInt();
-        buf = U.readByteArray(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopShuffleMessage.class, this);
-    }
-
-    /**
-     * Visitor.
-     */
-    public static interface Visitor {
-        /**
-         * @param buf Buffer.
-         * @param off Offset.
-         * @param len Length.
-         */
-        public void onKey(byte[] buf, int off, int len) throws 
IgniteCheckedException;
-
-        /**
-         * @param buf Buffer.
-         * @param off Offset.
-         * @param len Length.
-         */
-        public void onValue(byte[] buf, int off, int len) throws 
IgniteCheckedException;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java
deleted file mode 100644
index ffa7871..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java
+++ /dev/null
@@ -1,616 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import java.io.DataInput;
-import java.util.Random;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLongArray;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.GridRandom;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Multimap for map reduce intermediate results.
- */
-public class HadoopConcurrentHashMultimap extends HadoopHashMultimapBase {
-    /** */
-    private final AtomicReference<State> state = new 
AtomicReference<>(State.READING_WRITING);
-
-    /** */
-    private volatile AtomicLongArray oldTbl;
-
-    /** */
-    private volatile AtomicLongArray newTbl;
-
-    /** */
-    private final AtomicInteger keys = new AtomicInteger();
-
-    /** */
-    private final CopyOnWriteArrayList<AdderImpl> adders = new 
CopyOnWriteArrayList<>();
-
-    /** */
-    private final AtomicInteger inputs = new AtomicInteger();
-
-    /**
-     * @param jobInfo Job info.
-     * @param mem Memory.
-     * @param cap Initial capacity.
-     */
-    public HadoopConcurrentHashMultimap(HadoopJobInfo jobInfo, 
GridUnsafeMemory mem, int cap) {
-        super(jobInfo, mem);
-
-        assert U.isPow2(cap);
-
-        newTbl = oldTbl = new AtomicLongArray(cap);
-    }
-
-    /**
-     * @return Number of keys.
-     */
-    public long keys() {
-        int res = keys.get();
-
-        for (AdderImpl adder : adders)
-            res += adder.locKeys.get();
-
-        return res;
-    }
-
-    /**
-     * @return Current table capacity.
-     */
-    @Override public int capacity() {
-        return oldTbl.length();
-    }
-
-    /**
-     * @return Adder object.
-     * @param ctx Task context.
-     */
-    @Override public Adder startAdding(HadoopTaskContext ctx) throws 
IgniteCheckedException {
-        if (inputs.get() != 0)
-            throw new IllegalStateException("Active inputs.");
-
-        if (state.get() == State.CLOSING)
-            throw new IllegalStateException("Closed.");
-
-        return new AdderImpl(ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() {
-        assert inputs.get() == 0 : inputs.get();
-        assert adders.isEmpty() : adders.size();
-
-        state(State.READING_WRITING, State.CLOSING);
-
-        if (keys() == 0)
-            return;
-
-        super.close();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected long meta(int idx) {
-        return oldTbl.get(idx);
-    }
-
-    /**
-     * Incrementally visits all the keys and values in the map.
-     *
-     * @param ignoreLastVisited Flag indicating that visiting must be started 
from the beginning.
-     * @param v Visitor.
-     * @return {@code false} If visiting was impossible due to rehashing.
-     */
-    @Override public boolean visit(boolean ignoreLastVisited, Visitor v) 
throws IgniteCheckedException {
-        if (!state.compareAndSet(State.READING_WRITING, State.VISITING)) {
-            assert state.get() != State.CLOSING;
-
-            return false; // Can not visit while rehashing happens.
-        }
-
-        AtomicLongArray tbl0 = oldTbl;
-
-        for (int i = 0; i < tbl0.length(); i++) {
-            long meta = tbl0.get(i);
-
-            while (meta != 0) {
-                long valPtr = value(meta);
-
-                long lastVisited = ignoreLastVisited ? 0 : 
lastVisitedValue(meta);
-
-                if (valPtr != lastVisited) {
-                    v.onKey(key(meta), keySize(meta));
-
-                    lastVisitedValue(meta, valPtr); // Set it to the first 
value in chain.
-
-                    do {
-                        v.onValue(valPtr + 12, valueSize(valPtr));
-
-                        valPtr = nextValue(valPtr);
-                    }
-                    while (valPtr != lastVisited);
-                }
-
-                meta = collision(meta);
-            }
-        }
-
-        state(State.VISITING, State.READING_WRITING);
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws 
IgniteCheckedException {
-        inputs.incrementAndGet();
-
-        if (!adders.isEmpty())
-            throw new IllegalStateException("Active adders.");
-
-        State s = state.get();
-
-        if (s == State.CLOSING)
-            throw new IllegalStateException("Closed.");
-
-        assert s != State.REHASHING;
-
-        return new Input(taskCtx) {
-            @Override public void close() throws IgniteCheckedException {
-                if (inputs.decrementAndGet() < 0)
-                    throw new IllegalStateException();
-
-                super.close();
-            }
-        };
-    }
-
-    /**
-     * @param fromTbl Table.
-     */
-    private void rehashIfNeeded(AtomicLongArray fromTbl) {
-        if (fromTbl.length() == Integer.MAX_VALUE)
-            return;
-
-        long keys0 = keys();
-
-        if (keys0 < 3 * (fromTbl.length() >>> 2)) // New size has to be >= 
than 3/4 of capacity to rehash.
-            return;
-
-        if (fromTbl != newTbl) // Check if someone else have done the job.
-            return;
-
-        if (!state.compareAndSet(State.READING_WRITING, State.REHASHING)) {
-            assert state.get() != State.CLOSING; // Visiting is allowed, but 
we will not rehash.
-
-            return;
-        }
-
-        if (fromTbl != newTbl) { // Double check.
-            state(State.REHASHING, State.READING_WRITING); // Switch back.
-
-            return;
-        }
-
-        // Calculate new table capacity.
-        int newLen = fromTbl.length();
-
-        do {
-            newLen <<= 1;
-        }
-        while (newLen < keys0);
-
-        if (keys0 >= 3 * (newLen >>> 2)) // Still more than 3/4.
-            newLen <<= 1;
-
-        // This is our target table for rehashing.
-        AtomicLongArray toTbl = new AtomicLongArray(newLen);
-
-        // Make the new table visible before rehashing.
-        newTbl = toTbl;
-
-        // Rehash.
-        int newMask = newLen - 1;
-
-        long failedMeta = 0;
-
-        GridLongList collisions = new GridLongList(16);
-
-        for (int i = 0; i < fromTbl.length(); i++) { // Scan source table.
-            long meta = fromTbl.get(i);
-
-            assert meta != -1;
-
-            if (meta == 0) { // No entry.
-                failedMeta = 0;
-
-                if (!fromTbl.compareAndSet(i, 0, -1)) // Mark as moved.
-                    i--; // Retry.
-
-                continue;
-            }
-
-            do { // Collect all the collisions before the last one failed to 
nullify or 0.
-                collisions.add(meta);
-
-                meta = collision(meta);
-            }
-            while (meta != failedMeta);
-
-            do { // Go from the last to the first to avoid 'in-flight' state 
for meta entries.
-                meta = collisions.remove();
-
-                int addr = keyHash(meta) & newMask;
-
-                for (;;) { // Move meta entry to the new table.
-                    long toCollision = toTbl.get(addr);
-
-                    collision(meta, toCollision);
-
-                    if (toTbl.compareAndSet(addr, toCollision, meta))
-                        break;
-                }
-            }
-            while (!collisions.isEmpty());
-
-            // Here 'meta' will be a root pointer in old table.
-            if (!fromTbl.compareAndSet(i, meta, -1)) { // Try to mark as moved.
-                failedMeta = meta;
-
-                i--; // Retry the same address in table because new keys were 
added.
-            }
-            else
-                failedMeta = 0;
-        }
-
-        // Now old and new tables will be the same again.
-        oldTbl = toTbl;
-
-        state(State.REHASHING, State.READING_WRITING);
-    }
-
-    /**
-     * Switch state.
-     *
-     * @param oldState Expected state.
-     * @param newState New state.
-     */
-    private void state(State oldState, State newState) {
-        if (!state.compareAndSet(oldState, newState))
-            throw new IllegalStateException();
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @return Value pointer.
-     */
-    @Override protected long value(long meta) {
-        return mem.readLongVolatile(meta + 16);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param oldValPtr Old value.
-     * @param newValPtr New value.
-     * @return {@code true} If succeeded.
-     */
-    private boolean casValue(long meta, long oldValPtr, long newValPtr) {
-        return mem.casLong(meta + 16, oldValPtr, newValPtr);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @return Collision pointer.
-     */
-    @Override protected long collision(long meta) {
-        return mem.readLongVolatile(meta + 24);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param collision Collision pointer.
-     */
-    @Override protected void collision(long meta, long collision) {
-        assert meta != collision : meta;
-
-        mem.writeLongVolatile(meta + 24, collision);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @return Last visited value pointer.
-     */
-    private long lastVisitedValue(long meta) {
-        return mem.readLong(meta + 32);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param valPtr Last visited value pointer.
-     */
-    private void lastVisitedValue(long meta, long valPtr) {
-        mem.writeLong(meta + 32, valPtr);
-    }
-
-    /**
-     * Adder. Must not be shared between threads.
-     */
-    private class AdderImpl extends AdderBase {
-        /** */
-        private final Reader keyReader;
-
-        /** */
-        private final AtomicInteger locKeys = new AtomicInteger();
-
-        /** */
-        private final Random rnd = new GridRandom();
-
-        /**
-         * @param ctx Task context.
-         * @throws IgniteCheckedException If failed.
-         */
-        private AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException 
{
-            super(ctx);
-
-            keyReader = new Reader(keySer);
-
-            rehashIfNeeded(oldTbl);
-
-            adders.add(this);
-        }
-
-        /**
-         * @param in Data input.
-         * @param reuse Reusable key.
-         * @return Key.
-         * @throws IgniteCheckedException If failed.
-         */
-        @Override public Key addKey(DataInput in, @Nullable Key reuse) throws 
IgniteCheckedException {
-            KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse;
-
-            k.tmpKey = keySer.read(in, k.tmpKey);
-
-            k.meta = add(k.tmpKey, null);
-
-            return k;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(Object key, Object val) throws 
IgniteCheckedException {
-            A.notNull(val, "val");
-
-            add(key, val);
-        }
-
-        /**
-         * @param tbl Table.
-         */
-        private void incrementKeys(AtomicLongArray tbl) {
-            locKeys.lazySet(locKeys.get() + 1);
-
-            if (rnd.nextInt(tbl.length()) < 512)
-                rehashIfNeeded(tbl);
-        }
-
-        /**
-         * @param keyHash Key hash.
-         * @param keySize Key size.
-         * @param keyPtr Key pointer.
-         * @param valPtr Value page pointer.
-         * @param collisionPtr Pointer to meta with hash collision.
-         * @param lastVisitedVal Last visited value pointer.
-         * @return Created meta page pointer.
-         */
-        private long createMeta(int keyHash, int keySize, long keyPtr, long 
valPtr, long collisionPtr, long lastVisitedVal) {
-            long meta = allocate(40);
-
-            mem.writeInt(meta, keyHash);
-            mem.writeInt(meta + 4, keySize);
-            mem.writeLong(meta + 8, keyPtr);
-            mem.writeLong(meta + 16, valPtr);
-            mem.writeLong(meta + 24, collisionPtr);
-            mem.writeLong(meta + 32, lastVisitedVal);
-
-            return meta;
-        }
-
-        /**
-         * @param key Key.
-         * @param val Value.
-         * @return Updated or created meta page pointer.
-         * @throws IgniteCheckedException If failed.
-         */
-        private long add(Object key, @Nullable Object val) throws 
IgniteCheckedException {
-            AtomicLongArray tbl = oldTbl;
-
-            int keyHash = U.hash(key.hashCode());
-
-            long newMetaPtr = 0;
-
-            long valPtr = 0;
-
-            if (val != null) {
-                valPtr = write(12, val, valSer);
-                int valSize = writtenSize() - 12;
-
-                valueSize(valPtr, valSize);
-            }
-
-            for (AtomicLongArray old = null;;) {
-                int addr = keyHash & (tbl.length() - 1);
-
-                long metaPtrRoot = tbl.get(addr); // Read root meta pointer at 
this address.
-
-                if (metaPtrRoot == -1) { // The cell was already moved by 
rehashing.
-                    AtomicLongArray n = newTbl; // Need to read newTbl first 
here.
-                    AtomicLongArray o = oldTbl;
-
-                    tbl = tbl == o ? n : o; // Trying to get the oldest table 
but newer than ours.
-
-                    old = null;
-
-                    continue;
-                }
-
-                if (metaPtrRoot != 0) { // Not empty slot.
-                    long metaPtr = metaPtrRoot;
-
-                    do { // Scan all the collisions.
-                        if (keyHash(metaPtr) == keyHash && 
key.equals(keyReader.readKey(metaPtr))) { // Found key.
-                            if (newMetaPtr != 0)  // Deallocate new meta if 
one was allocated.
-                                localDeallocate(key(newMetaPtr)); // Key was 
allocated first, so rewind to it's pointer.
-
-                            if (valPtr != 0) { // Add value if it exists.
-                                long nextValPtr;
-
-                                // Values are linked to each other to a stack 
like structure.
-                                // Replace the last value in meta with ours 
and link it as next.
-                                do {
-                                    nextValPtr = value(metaPtr);
-
-                                    nextValue(valPtr, nextValPtr);
-                                }
-                                while (!casValue(metaPtr, nextValPtr, valPtr));
-                            }
-
-                            return metaPtr;
-                        }
-
-                        metaPtr = collision(metaPtr);
-                    }
-                    while (metaPtr != 0);
-
-                    // Here we did not find our key, need to check if it was 
moved by rehashing to the new table.
-                    if (old == null) { // If the old table already set, then 
we will just try to update it.
-                        AtomicLongArray n = newTbl;
-
-                        if (n != tbl) { // Rehashing happens, try to find the 
key in new table but preserve the old one.
-                            old = tbl;
-                            tbl = n;
-
-                            continue;
-                        }
-                    }
-                }
-
-                if (old != null) { // We just checked new table but did not 
find our key as well as in the old one.
-                    tbl = old; // Try to add new key to the old table.
-
-                    addr = keyHash & (tbl.length() - 1);
-
-                    old = null;
-                }
-
-                if (newMetaPtr == 0) { // Allocate new meta page.
-                    long keyPtr = write(0, key, keySer);
-                    int keySize = writtenSize();
-
-                    if (valPtr != 0)
-                        nextValue(valPtr, 0);
-
-                    newMetaPtr = createMeta(keyHash, keySize, keyPtr, valPtr, 
metaPtrRoot, 0);
-                }
-                else // Update new meta with root pointer collision.
-                    collision(newMetaPtr, metaPtrRoot);
-
-                if (tbl.compareAndSet(addr, metaPtrRoot, newMetaPtr)) { // Try 
to replace root pointer with new one.
-                    incrementKeys(tbl);
-
-                    return newMetaPtr;
-                }
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IgniteCheckedException {
-            if (!adders.remove(this))
-                throw new IllegalStateException();
-
-            keys.addAndGet(locKeys.get()); // Here we have race and #keys() 
method can return wrong result but it is ok.
-
-            super.close();
-        }
-
-        /**
-         * Key.
-         */
-        private class KeyImpl implements Key {
-            /** */
-            private long meta;
-
-            /** */
-            private Object tmpKey;
-
-            /**
-             * @return Meta pointer for the key.
-             */
-            public long address() {
-                return meta;
-            }
-
-            /**
-             * @param val Value.
-             */
-            @Override public void add(Value val) {
-                int size = val.size();
-
-                long valPtr = allocate(size + 12);
-
-                val.copyTo(valPtr + 12);
-
-                valueSize(valPtr, size);
-
-                long nextVal;
-
-                do {
-                    nextVal = value(meta);
-
-                    nextValue(valPtr, nextVal);
-                }
-                while(!casValue(meta, nextVal, valPtr));
-            }
-        }
-    }
-
-    /**
-     * Current map state.
-     */
-    private enum State {
-        /** */
-        REHASHING,
-
-        /** */
-        VISITING,
-
-        /** */
-        READING_WRITING,
-
-        /** */
-        CLOSING
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java
deleted file mode 100644
index c32e9af..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Hash multimap.
- */
-public class HadoopHashMultimap extends HadoopHashMultimapBase {
-    /** */
-    private long[] tbl;
-
-    /** */
-    private int keys;
-
-    /**
-     * @param jobInfo Job info.
-     * @param mem Memory.
-     * @param cap Initial capacity.
-     */
-    public HadoopHashMultimap(HadoopJobInfo jobInfo, GridUnsafeMemory mem, int 
cap) {
-        super(jobInfo, mem);
-
-        assert U.isPow2(cap) : cap;
-
-        tbl = new long[cap];
-    }
-
-    /** {@inheritDoc} */
-    @Override public Adder startAdding(HadoopTaskContext ctx) throws 
IgniteCheckedException {
-        return new AdderImpl(ctx);
-    }
-
-    /**
-     * Rehash.
-     */
-    private void rehash() {
-        long[] newTbl = new long[tbl.length << 1];
-
-        int newMask = newTbl.length - 1;
-
-        for (long meta : tbl) {
-            while (meta != 0) {
-                long collision = collision(meta);
-
-                int idx = keyHash(meta) & newMask;
-
-                collision(meta, newTbl[idx]);
-
-                newTbl[idx] = meta;
-
-                meta = collision;
-            }
-        }
-
-        tbl = newTbl;
-    }
-
-    /**
-     * @return Keys count.
-     */
-    public int keys() {
-        return keys;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int capacity() {
-        return tbl.length;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected long meta(int idx) {
-        return tbl[idx];
-    }
-
-    /**
-     * Adder.
-     */
-    private class AdderImpl extends AdderBase {
-        /** */
-        private final Reader keyReader;
-
-        /**
-         * @param ctx Task context.
-         * @throws IgniteCheckedException If failed.
-         */
-        protected AdderImpl(HadoopTaskContext ctx) throws 
IgniteCheckedException {
-            super(ctx);
-
-            keyReader = new Reader(keySer);
-        }
-
-        /**
-         * @param keyHash Key hash.
-         * @param keySize Key size.
-         * @param keyPtr Key pointer.
-         * @param valPtr Value page pointer.
-         * @param collisionPtr Pointer to meta with hash collision.
-         * @return Created meta page pointer.
-         */
-        private long createMeta(int keyHash, int keySize, long keyPtr, long 
valPtr, long collisionPtr) {
-            long meta = allocate(32);
-
-            mem.writeInt(meta, keyHash);
-            mem.writeInt(meta + 4, keySize);
-            mem.writeLong(meta + 8, keyPtr);
-            mem.writeLong(meta + 16, valPtr);
-            mem.writeLong(meta + 24, collisionPtr);
-
-            return meta;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(Object key, Object val) throws 
IgniteCheckedException {
-            A.notNull(val, "val");
-
-            int keyHash = U.hash(key.hashCode());
-
-            // Write value.
-            long valPtr = write(12, val, valSer);
-            int valSize = writtenSize() - 12;
-
-            valueSize(valPtr, valSize);
-
-            // Find position in table.
-            int idx = keyHash & (tbl.length - 1);
-
-            long meta = tbl[idx];
-
-            // Search for our key in collisions.
-            while (meta != 0) {
-                if (keyHash(meta) == keyHash && 
key.equals(keyReader.readKey(meta))) { // Found key.
-                    nextValue(valPtr, value(meta));
-
-                    value(meta, valPtr);
-
-                    return;
-                }
-
-                meta = collision(meta);
-            }
-
-            // Write key.
-            long keyPtr = write(0, key, keySer);
-            int keySize = writtenSize();
-
-            nextValue(valPtr, 0);
-
-            tbl[idx] = createMeta(keyHash, keySize, keyPtr, valPtr, tbl[idx]);
-
-            if (++keys > (tbl.length >>> 2) * 3)
-                rehash();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java
deleted file mode 100644
index 8d9b3c3..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import java.util.Iterator;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-
-/**
- * Base class for hash multimaps.
- */
-public abstract class HadoopHashMultimapBase extends HadoopMultimapBase {
-    /**
-     * @param jobInfo Job info.
-     * @param mem Memory.
-     */
-    protected HadoopHashMultimapBase(HadoopJobInfo jobInfo, GridUnsafeMemory 
mem) {
-        super(jobInfo, mem);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean visit(boolean ignoreLastVisited, Visitor v) 
throws IgniteCheckedException {
-        throw new UnsupportedOperationException("visit");
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws 
IgniteCheckedException {
-        return new Input(taskCtx);
-    }
-
-    /**
-     * @return Hash table capacity.
-     */
-    public abstract int capacity();
-
-    /**
-     * @param idx Index in hash table.
-     * @return Meta page pointer.
-     */
-    protected abstract long meta(int idx);
-
-    /**
-     * @param meta Meta pointer.
-     * @return Key hash.
-     */
-    protected int keyHash(long meta) {
-        return mem.readInt(meta);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @return Key size.
-     */
-    protected int keySize(long meta) {
-        return mem.readInt(meta + 4);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @return Key pointer.
-     */
-    protected long key(long meta) {
-        return mem.readLong(meta + 8);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @return Value pointer.
-     */
-    protected long value(long meta) {
-        return mem.readLong(meta + 16);
-    }
-    /**
-     * @param meta Meta pointer.
-     * @param val Value pointer.
-     */
-    protected void value(long meta, long val) {
-        mem.writeLong(meta + 16, val);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @return Collision pointer.
-     */
-    protected long collision(long meta) {
-        return mem.readLong(meta + 24);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param collision Collision pointer.
-     */
-    protected void collision(long meta, long collision) {
-        assert meta != collision : meta;
-
-        mem.writeLong(meta + 24, collision);
-    }
-
-    /**
-     * Reader for key and value.
-     */
-    protected class Reader extends ReaderBase {
-        /**
-         * @param ser Serialization.
-         */
-        protected Reader(HadoopSerialization ser) {
-            super(ser);
-        }
-
-        /**
-         * @param meta Meta pointer.
-         * @return Key.
-         */
-        public Object readKey(long meta) {
-            assert meta > 0 : meta;
-
-            try {
-                return read(key(meta), keySize(meta));
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-    }
-
-    /**
-     * Task input.
-     */
-    protected class Input implements HadoopTaskInput {
-        /** */
-        private int idx = -1;
-
-        /** */
-        private long metaPtr;
-
-        /** */
-        private final int cap;
-
-        /** */
-        private final Reader keyReader;
-
-        /** */
-        private final Reader valReader;
-
-        /**
-         * @param taskCtx Task context.
-         * @throws IgniteCheckedException If failed.
-         */
-        public Input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
-            cap = capacity();
-
-            keyReader = new Reader(taskCtx.keySerialization());
-            valReader = new Reader(taskCtx.valueSerialization());
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            if (metaPtr != 0) {
-                metaPtr = collision(metaPtr);
-
-                if (metaPtr != 0)
-                    return true;
-            }
-
-            while (++idx < cap) { // Scan table.
-                metaPtr = meta(idx);
-
-                if (metaPtr != 0)
-                    return true;
-            }
-
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object key() {
-            return keyReader.readKey(metaPtr);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator<?> values() {
-            return new ValueIterator(value(metaPtr), valReader);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IgniteCheckedException {
-            keyReader.close();
-            valReader.close();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java
deleted file mode 100644
index 5b71c47..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import java.io.DataInput;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Multimap for hadoop intermediate results.
- */
-@SuppressWarnings("PublicInnerClass")
-public interface HadoopMultimap extends AutoCloseable {
-    /**
-     * Incrementally visits all the keys and values in the map.
-     *
-     * @param ignoreLastVisited Flag indicating that visiting must be started 
from the beginning.
-     * @param v Visitor.
-     * @return {@code false} If visiting was impossible.
-     */
-    public boolean visit(boolean ignoreLastVisited, Visitor v) throws 
IgniteCheckedException;
-
-    /**
-     * @param ctx Task context.
-     * @return Adder.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Adder startAdding(HadoopTaskContext ctx) throws 
IgniteCheckedException;
-
-    /**
-     * @param taskCtx Task context.
-     * @return Task input.
-     * @throws IgniteCheckedException If failed.
-     */
-    public HadoopTaskInput input(HadoopTaskContext taskCtx)
-        throws IgniteCheckedException;
-
-    /** {@inheritDoc} */
-    @Override public void close();
-
-    /**
-     * Adder.
-     */
-    public interface Adder extends HadoopTaskOutput {
-        /**
-         * @param in Data input.
-         * @param reuse Reusable key.
-         * @return Key.
-         * @throws IgniteCheckedException If failed.
-         */
-        public Key addKey(DataInput in, @Nullable Key reuse) throws 
IgniteCheckedException;
-    }
-
-    /**
-     * Key add values to.
-     */
-    public interface Key {
-        /**
-         * @param val Value.
-         */
-        public void add(Value val);
-    }
-
-    /**
-     * Value.
-     */
-    public interface Value {
-        /**
-         * @return Size in bytes.
-         */
-        public int size();
-
-        /**
-         * @param ptr Pointer.
-         */
-        public void copyTo(long ptr);
-    }
-
-    /**
-     * Key and values visitor.
-     */
-    public interface Visitor {
-        /**
-         * @param keyPtr Key pointer.
-         * @param keySize Key size.
-         */
-        public void onKey(long keyPtr, int keySize) throws 
IgniteCheckedException;
-
-        /**
-         * @param valPtr Value pointer.
-         * @param valSize Value size.
-         */
-        public void onValue(long valPtr, int valSize) throws 
IgniteCheckedException;
-    }
-}
\ No newline at end of file

Reply via email to