diff --git 
new file mode 100644
index 0000000..39b7c51
--- /dev/null
@@ -0,0 +1,435 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.jetbrains.annotations.Nullable;
+import static 
+import static 
+ * Base class for all multimaps.
+ */
+public abstract class HadoopMultimapBase implements HadoopMultimap {
+    /** */
+    protected final GridUnsafeMemory mem;
+    /** */
+    protected final int pageSize;
+    /** */
+    private final Collection<Page> allPages = new ConcurrentLinkedQueue<>();
+    /**
+     * @param jobInfo Job info.
+     * @param mem Memory.
+     */
+    protected HadoopMultimapBase(HadoopJobInfo jobInfo, GridUnsafeMemory mem) {
+        assert jobInfo != null;
+        assert mem != null;
+        this.mem = mem;
+        pageSize = get(jobInfo, SHUFFLE_OFFHEAP_PAGE_SIZE, 32 * 1024);
+    }
+    /**
+     * @param page Page.
+     */
+    private void deallocate(Page page) {
+        assert page != null;
+        mem.release(page.ptr, page.size);
+    }
+    /**
+     * @param valPtr Value page pointer.
+     * @param nextValPtr Next value page pointer.
+     */
+    protected void nextValue(long valPtr, long nextValPtr) {
+        mem.writeLong(valPtr, nextValPtr);
+    }
+    /**
+     * @param valPtr Value page pointer.
+     * @return Next value page pointer.
+     */
+    protected long nextValue(long valPtr) {
+        return mem.readLong(valPtr);
+    }
+    /**
+     * @param valPtr Value page pointer.
+     * @param size Size.
+     */
+    protected void valueSize(long valPtr, int size) {
+        mem.writeInt(valPtr + 8, size);
+    }
+    /**
+     * @param valPtr Value page pointer.
+     * @return Value size.
+     */
+    protected int valueSize(long valPtr) {
+        return mem.readInt(valPtr + 8);
+    }
+    /** {@inheritDoc} */
+    @Override public void close() {
+        for (Page page : allPages)
+            deallocate(page);
+    }
+    /**
+     * Reader for key and value.
+     */
+    protected class ReaderBase implements AutoCloseable {
+        /** */
+        private Object tmp;
+        /** */
+        private final HadoopSerialization ser;
+        /** */
+        private final HadoopDataInStream in = new HadoopDataInStream(mem);
+        /**
+         * @param ser Serialization.
+         */
+        protected ReaderBase(HadoopSerialization ser) {
+            assert ser != null;
+            this.ser = ser;
+        }
+        /**
+         * @param valPtr Value page pointer.
+         * @return Value.
+         */
+        public Object readValue(long valPtr) {
+            assert valPtr > 0 : valPtr;
+            try {
+                return read(valPtr + 12, valueSize(valPtr));
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+        /**
+         * Resets temporary object to the given one.
+         *
+         * @param tmp Temporary object for reuse.
+         */
+        public void resetReusedObject(Object tmp) {
+            this.tmp = tmp;
+        }
+        /**
+         * @param ptr Pointer.
+         * @param size Object size.
+         * @return Object.
+         */
+        protected Object read(long ptr, long size) throws 
IgniteCheckedException {
+            in.buffer().set(ptr, size);
+            tmp =, tmp);
+            return tmp;
+        }
+        /** {@inheritDoc} */
+        @Override public void close() throws IgniteCheckedException {
+            ser.close();
+        }
+    }
+    /**
+     * Base class for adders.
+     */
+    protected abstract class AdderBase implements Adder {
+        /** */
+        protected final HadoopSerialization keySer;
+        /** */
+        protected final HadoopSerialization valSer;
+        /** */
+        private final HadoopDataOutStream out;
+        /** */
+        private long writeStart;
+        /** Current page. */
+        private Page curPage;
+        /**
+         * @param ctx Task context.
+         * @throws IgniteCheckedException If failed.
+         */
+        protected AdderBase(HadoopTaskContext ctx) throws 
IgniteCheckedException {
+            valSer = ctx.valueSerialization();
+            keySer = ctx.keySerialization();
+            out = new HadoopDataOutStream(mem) {
+                @Override public long move(long size) {
+                    long ptr = super.move(size);
+                    if (ptr == 0) // Was not able to move - not enough free 
+                        ptr = allocateNextPage(size);
+                    assert ptr != 0;
+                    return ptr;
+                }
+            };
+        }
+        /**
+         * @param requestedSize Requested size.
+         * @return Next write pointer.
+         */
+        private long allocateNextPage(long requestedSize) {
+            int writtenSize = writtenSize();
+            long newPageSize = nextPageSize(writtenSize + requestedSize);
+            long newPagePtr = mem.allocate(newPageSize);
+            HadoopOffheapBuffer b = out.buffer();
+            b.set(newPagePtr, newPageSize);
+            if (writtenSize != 0) {
+                mem.copyMemory(writeStart, newPagePtr, writtenSize);
+                b.move(writtenSize);
+            }
+            writeStart = newPagePtr;
+            // At this point old page is not needed, so we release it.
+            Page oldPage = curPage;
+            curPage = new Page(newPagePtr, newPageSize);
+            if (oldPage != null)
+                allPages.add(oldPage);
+            return b.move(requestedSize);
+        }
+        /**
+         * Get next page size.
+         *
+         * @param required Required amount of data.
+         * @return Next page size.
+         */
+        private long nextPageSize(long required) {
+            long pages = (required / pageSize) + 1;
+            long pagesPow2 = nextPowerOfTwo(pages);
+            return pagesPow2 * pageSize;
+        }
+        /**
+         * Get next power of two which greater or equal to the given number. 
Naive implementation.
+         *
+         * @param val Number
+         * @return Nearest pow2.
+         */
+        private long nextPowerOfTwo(long val) {
+            long res = 1;
+            while (res < val)
+                res = res << 1;
+            if (res < 0)
+                throw new IllegalArgumentException("Value is too big to find 
positive pow2: " + val);
+            return res;
+        }
+        /**
+         * @return Fixed pointer.
+         */
+        private long fixAlignment() {
+            HadoopOffheapBuffer b = out.buffer();
+            long ptr = b.pointer();
+            if ((ptr & 7L) != 0) { // Address is not aligned by octet.
+                ptr = (ptr + 8L) & ~7L;
+                b.pointer(ptr);
+            }
+            return ptr;
+        }
+        /**
+         * @param off Offset.
+         * @param o Object.
+         * @return Page pointer.
+         * @throws IgniteCheckedException If failed.
+         */
+        protected long write(int off, Object o, HadoopSerialization ser) 
throws IgniteCheckedException {
+            writeStart = fixAlignment();
+            if (off != 0)
+                out.move(off);
+            ser.write(out, o);
+            return writeStart;
+        }
+        /**
+         * @param size Size.
+         * @return Pointer.
+         */
+        protected long allocate(int size) {
+            writeStart = fixAlignment();
+            out.move(size);
+            return writeStart;
+        }
+        /**
+         * Rewinds local allocation pointer to the given pointer if possible.
+         *
+         * @param ptr Pointer.
+         */
+        protected void localDeallocate(long ptr) {
+            HadoopOffheapBuffer b = out.buffer();
+            if (b.isInside(ptr))
+                b.pointer(ptr);
+            else
+                b.reset();
+        }
+        /**
+         * @return Written size.
+         */
+        protected int writtenSize() {
+            return (int)(out.buffer().pointer() - writeStart);
+        }
+        /** {@inheritDoc} */
+        @Override public Key addKey(DataInput in, @Nullable Key reuse) throws 
IgniteCheckedException {
+            throw new UnsupportedOperationException();
+        }
+        /** {@inheritDoc} */
+        @Override public void close() throws IgniteCheckedException {
+            if (curPage != null)
+                allPages.add(curPage);
+            keySer.close();
+            valSer.close();
+        }
+    }
+    /**
+     * Iterator over values.
+     */
+    protected class ValueIterator implements Iterator<Object> {
+        /** */
+        private long valPtr;
+        /** */
+        private final ReaderBase valReader;
+        /**
+         * @param valPtr Value page pointer.
+         * @param valReader Value reader.
+         */
+        protected ValueIterator(long valPtr, ReaderBase valReader) {
+            this.valPtr = valPtr;
+            this.valReader = valReader;
+        }
+        /**
+         * @param valPtr Head value pointer.
+         */
+        public void head(long valPtr) {
+            this.valPtr = valPtr;
+        }
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            return valPtr != 0;
+        }
+        /** {@inheritDoc} */
+        @Override public Object next() {
+            if (!hasNext())
+                throw new NoSuchElementException();
+            Object res = valReader.readValue(valPtr);
+            valPtr = nextValue(valPtr);
+            return res;
+        }
+        /** {@inheritDoc} */
+        @Override public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+    /**
+     * Page.
+     */
+    private static class Page {
+        /** Pointer. */
+        private final long ptr;
+        /** Size. */
+        private final long size;
+        /**
+         * Constructor.
+         *
+         * @param ptr Pointer.
+         * @param size Size.
+         */
+        public Page(long ptr, long size) {
+            this.ptr = ptr;
+            this.size = size;
+        }
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..7db88bc
--- /dev/null
@@ -0,0 +1,733 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.jetbrains.annotations.Nullable;
+ * Skip list.
+ */
+public class HadoopSkipList extends HadoopMultimapBase {
+    /** */
+    private static final int HEADS_SIZE = 24 + 33 * 8; // Offset + max level 
is from 0 to 32 inclusive.
+    /** Top level. */
+    private final AtomicInteger topLevel = new AtomicInteger(-1);
+    /** Heads for all the lists. */
+    private final long heads;
+    /** */
+    private final AtomicBoolean visitGuard = new AtomicBoolean();
+    /**
+     * @param jobInfo Job info.
+     * @param mem Memory.
+     */
+    public HadoopSkipList(HadoopJobInfo jobInfo, GridUnsafeMemory mem) {
+        super(jobInfo, mem);
+        heads = mem.allocate(HEADS_SIZE, true);
+    }
+    /** {@inheritDoc} */
+    @Override public void close() {
+        super.close();
+        mem.release(heads, HEADS_SIZE);
+    }
+    /** {@inheritDoc} */
+    @Override public boolean visit(boolean ignoreLastVisited, Visitor v) 
throws IgniteCheckedException {
+        if (!visitGuard.compareAndSet(false, true))
+            return false;
+        for (long meta = nextMeta(heads, 0); meta != 0L; meta = nextMeta(meta, 
0)) {
+            long valPtr = value(meta);
+            long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta);
+            if (valPtr != lastVisited) {
+                long k = key(meta);
+                v.onKey(k + 4, keySize(k));
+                lastVisitedValue(meta, valPtr); // Set it to the first value 
in chain.
+                do {
+                    v.onValue(valPtr + 12, valueSize(valPtr));
+                    valPtr = nextValue(valPtr);
+                }
+                while (valPtr != lastVisited);
+            }
+        }
+        visitGuard.lazySet(false);
+        return true;
+    }
+    /** {@inheritDoc} */
+    @Override public Adder startAdding(HadoopTaskContext ctx) throws 
IgniteCheckedException {
+        return new AdderImpl(ctx);
+    }
+    /** {@inheritDoc} */
+    @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws 
IgniteCheckedException {
+        Input in = new Input(taskCtx);
+        Comparator<Object> grpCmp = taskCtx.groupComparator();
+        if (grpCmp != null)
+            return new GroupedInput(grpCmp, in);
+        return in;
+    }
+    /**
+     * @param meta Meta pointer.
+     * @return Key pointer.
+     */
+    private long key(long meta) {
+        return mem.readLong(meta);
+    }
+    /**
+     * @param meta Meta pointer.
+     * @param key Key pointer.
+     */
+    private void key(long meta, long key) {
+        mem.writeLong(meta, key);
+    }
+    /**
+     * @param meta Meta pointer.
+     * @return Value pointer.
+     */
+    private long value(long meta) {
+        return mem.readLongVolatile(meta + 8);
+    }
+    /**
+     * @param meta Meta pointer.
+     * @param valPtr Value pointer.
+     */
+    private void value(long meta, long valPtr) {
+        mem.writeLongVolatile(meta + 8, valPtr);
+    }
+    /**
+     * @param meta Meta pointer.
+     * @param oldValPtr Old first value pointer.
+     * @param newValPtr New first value pointer.
+     * @return {@code true} If operation succeeded.
+     */
+    private boolean casValue(long meta, long oldValPtr, long newValPtr) {
+        return mem.casLong(meta + 8, oldValPtr, newValPtr);
+    }
+    /**
+     * @param meta Meta pointer.
+     * @return Last visited value pointer.
+     */
+    private long lastVisitedValue(long meta) {
+        return mem.readLong(meta + 16);
+    }
+    /**
+     * @param meta Meta pointer.
+     * @param valPtr Last visited value pointer.
+     */
+    private void lastVisitedValue(long meta, long valPtr) {
+        mem.writeLong(meta + 16, valPtr);
+    }
+    /**
+     * @param meta Meta pointer.
+     * @param level Level.
+     * @return Next meta pointer.
+     */
+    private long nextMeta(long meta, int level) {
+        assert meta > 0 : meta;
+        return mem.readLongVolatile(meta + 24 + 8 * level);
+    }
+    /**
+     * @param meta Meta pointer.
+     * @param level Level.
+     * @param oldNext Old next meta pointer.
+     * @param newNext New next meta pointer.
+     * @return {@code true} If operation succeeded.
+     */
+    private boolean casNextMeta(long meta, int level, long oldNext, long 
newNext) {
+        assert meta > 0 : meta;
+        return mem.casLong(meta + 24 + 8 * level, oldNext, newNext);
+    }
+    /**
+     * @param meta Meta pointer.
+     * @param level Level.
+     * @param nextMeta Next meta.
+     */
+    private void nextMeta(long meta, int level, long nextMeta) {
+        assert meta != 0;
+        mem.writeLong(meta + 24 + 8 * level, nextMeta);
+    }
+    /**
+     * @param keyPtr Key pointer.
+     * @return Key size.
+     */
+    private int keySize(long keyPtr) {
+        return mem.readInt(keyPtr);
+    }
+    /**
+     * @param keyPtr Key pointer.
+     * @param keySize Key size.
+     */
+    private void keySize(long keyPtr, int keySize) {
+        mem.writeInt(keyPtr, keySize);
+    }
+    /**
+     * @param rnd Random.
+     * @return Next level.
+     */
+    public static int randomLevel(Random rnd) {
+        int x = rnd.nextInt();
+        int level = 0;
+        while ((x & 1) != 0) { // Count sequential 1 bits.
+            level++;
+            x >>>= 1;
+        }
+        return level;
+    }
+    /**
+     * Reader.
+     */
+    private class Reader extends ReaderBase {
+        /**
+         * @param ser Serialization.
+         */
+        protected Reader(HadoopSerialization ser) {
+            super(ser);
+        }
+        /**
+         * @param meta Meta pointer.
+         * @return Key.
+         */
+        public Object readKey(long meta) {
+            assert meta > 0 : meta;
+            long k = key(meta);
+            try {
+                return read(k + 4, keySize(k));
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+    }
+    /**
+     * Adder.
+     */
+    private class AdderImpl extends AdderBase {
+        /** */
+        private final Comparator<Object> cmp;
+        /** */
+        private final Random rnd = new GridRandom();
+        /** */
+        private final GridLongList stack = new GridLongList(16);
+        /** */
+        private final Reader keyReader;
+        /**
+         * @param ctx Task context.
+         * @throws IgniteCheckedException If failed.
+         */
+        protected AdderImpl(HadoopTaskContext ctx) throws 
IgniteCheckedException {
+            super(ctx);
+            keyReader = new Reader(keySer);
+            cmp = ctx.sortComparator();
+        }
+        /** {@inheritDoc} */
+        @Override public void write(Object key, Object val) throws 
IgniteCheckedException {
+            A.notNull(val, "val");
+            add(key, val);
+        }
+        /** {@inheritDoc} */
+        @Override public Key addKey(DataInput in, @Nullable Key reuse) throws 
IgniteCheckedException {
+            KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse;
+            k.tmpKey =, k.tmpKey);
+            k.meta = add(k.tmpKey, null);
+            return k;
+        }
+        /**
+         * @param key Key.
+         * @param val Value.
+         * @param level Level.
+         * @return Meta pointer.
+         */
+        private long createMeta(long key, long val, int level) {
+            int size = 32 + 8 * level;
+            long meta = allocate(size);
+            key(meta, key);
+            value(meta, val);
+            lastVisitedValue(meta, 0L);
+            for (int i = 32; i < size; i += 8) // Fill with 0.
+                mem.writeLong(meta + i, 0L);
+            return meta;
+        }
+        /**
+         * @param key Key.
+         * @return Pointer.
+         * @throws IgniteCheckedException If failed.
+         */
+        private long writeKey(Object key) throws IgniteCheckedException {
+            long keyPtr = write(4, key, keySer);
+            int keySize = writtenSize() - 4;
+            keySize(keyPtr, keySize);
+            return keyPtr;
+        }
+        /**
+         * @param prevMeta Previous meta.
+         * @param meta Next meta.
+         */
+        private void stackPush(long prevMeta, long meta) {
+            stack.add(prevMeta);
+            stack.add(meta);
+        }
+        /**
+         * Drops last remembered frame from the stack.
+         */
+        private void stackPop() {
+            stack.pop(2);
+        }
+        /**
+         * @param key Key.
+         * @param val Value.
+         * @return Meta pointer.
+         * @throws IgniteCheckedException If failed.
+         */
+        private long add(Object key, @Nullable Object val) throws 
IgniteCheckedException {
+            assert key != null;
+            stack.clear();
+            long valPtr = 0;
+            if (val != null) { // Write value.
+                valPtr = write(12, val, valSer);
+                int valSize = writtenSize() - 12;
+                nextValue(valPtr, 0);
+                valueSize(valPtr, valSize);
+            }
+            long keyPtr = 0;
+            long newMeta = 0;
+            int newMetaLevel = -1;
+            long prevMeta = heads;
+            int level = topLevel.get();
+            long meta = level < 0 ? 0 : nextMeta(heads, level);
+            for (;;) {
+                if (level < 0) { // We did not find our key, trying to add new 
+                    if (keyPtr == 0) { // Write key and create meta only once.
+                        keyPtr = writeKey(key);
+                        newMetaLevel = randomLevel(rnd);
+                        newMeta = createMeta(keyPtr, valPtr, newMetaLevel);
+                    }
+                    nextMeta(newMeta, 0, meta); // Set next to new meta before 
+                    if (casNextMeta(prevMeta, 0, meta, newMeta)) { // New key 
was added successfully.
+                        laceUp(key, newMeta, newMetaLevel);
+                        return newMeta;
+                    }
+                    else { // Add failed, need to check out what was added by 
another thread.
+                        meta = nextMeta(prevMeta, level = 0);
+                        stackPop();
+                    }
+                }
+                int cmpRes = cmp(key, meta);
+                if (cmpRes == 0) { // Key found.
+                    if (newMeta != 0)  // Deallocate if we've allocated 
+                        localDeallocate(keyPtr);
+                    if (valPtr == 0) // Only key needs to be added.
+                        return meta;
+                    for (;;) { // Add value for the key found.
+                        long nextVal = value(meta);
+                        nextValue(valPtr, nextVal);
+                        if (casValue(meta, nextVal, valPtr))
+                            return meta;
+                    }
+                }
+                assert cmpRes != 0;
+                if (cmpRes > 0) { // Go right.
+                    prevMeta = meta;
+                    meta = nextMeta(meta, level);
+                    if (meta != 0) // If nothing to the right then go down.
+                        continue;
+                }
+                while (--level >= 0) { // Go down.
+                    stackPush(prevMeta, meta); // Remember the path.
+                    long nextMeta = nextMeta(prevMeta, level);
+                    if (nextMeta != meta) { // If the meta is the same as on 
upper level go deeper.
+                        meta = nextMeta;
+                        assert meta != 0;
+                        break;
+                    }
+                }
+            }
+        }
+        /**
+         * @param key Key.
+         * @param meta Meta pointer.
+         * @return Comparison result.
+         */
+        @SuppressWarnings("unchecked")
+        private int cmp(Object key, long meta) {
+            assert meta != 0;
+            return, keyReader.readKey(meta));
+        }
+        /**
+         * Adds appropriate index links between metas.
+         *
+         * @param newMeta Just added meta.
+         * @param newMetaLevel New level.
+         */
+        private void laceUp(Object key, long newMeta, int newMetaLevel) {
+            for (int level = 1; level <= newMetaLevel; level++) { // Go from 
the bottom up.
+                long prevMeta = heads;
+                long meta = 0;
+                if (!stack.isEmpty()) { // Get the path back.
+                    meta = stack.remove();
+                    prevMeta = stack.remove();
+                }
+                for (;;) {
+                    nextMeta(newMeta, level, meta);
+                    if (casNextMeta(prevMeta, level, meta, newMeta))
+                        break;
+                    long oldMeta = meta;
+                    meta = nextMeta(prevMeta, level); // Reread meta.
+                    for (;;) {
+                        int cmpRes = cmp(key, meta);
+                        if (cmpRes > 0) { // Go right.
+                            prevMeta = meta;
+                            meta = nextMeta(prevMeta, level);
+                            if (meta != oldMeta) // Old meta already known to 
be greater than ours or is 0.
+                                continue;
+                        }
+                        assert cmpRes != 0; // Two different metas with equal 
keys must be impossible.
+                        break; // Retry cas.
+                    }
+                }
+            }
+            if (!stack.isEmpty())
+                return; // Our level already lower than top.
+            for (;;) { // Raise top level.
+                int top = topLevel.get();
+                if (newMetaLevel <= top || topLevel.compareAndSet(top, 
+                    break;
+            }
+        }
+        /**
+         * Key.
+         */
+        private class KeyImpl implements Key {
+            /** */
+            private long meta;
+            /** */
+            private Object tmpKey;
+            /**
+             * @return Meta pointer for the key.
+             */
+            public long address() {
+                return meta;
+            }
+            /**
+             * @param val Value.
+             */
+            @Override public void add(Value val) {
+                int size = val.size();
+                long valPtr = allocate(size + 12);
+                val.copyTo(valPtr + 12);
+                valueSize(valPtr, size);
+                long nextVal;
+                do {
+                    nextVal = value(meta);
+                    nextValue(valPtr, nextVal);
+                }
+                while(!casValue(meta, nextVal, valPtr));
+            }
+        }
+    }
+    /**
+     * Task input.
+     */
+    private class Input implements HadoopTaskInput {
+        /** */
+        private long metaPtr = heads;
+        /** */
+        private final Reader keyReader;
+        /** */
+        private final Reader valReader;
+        /**
+         * @param taskCtx Task context.
+         * @throws IgniteCheckedException If failed.
+         */
+        private Input(HadoopTaskContext taskCtx) throws IgniteCheckedException 
+            keyReader = new Reader(taskCtx.keySerialization());
+            valReader = new Reader(taskCtx.valueSerialization());
+        }
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            metaPtr = nextMeta(metaPtr, 0);
+            return metaPtr != 0;
+        }
+        /** {@inheritDoc} */
+        @Override public Object key() {
+            return keyReader.readKey(metaPtr);
+        }
+        /** {@inheritDoc} */
+        @Override public Iterator<?> values() {
+            return new ValueIterator(value(metaPtr), valReader);
+        }
+        /** {@inheritDoc} */
+        @Override public void close() throws IgniteCheckedException {
+            keyReader.close();
+            valReader.close();
+        }
+    }
+    /**
+     * Grouped input using grouping comparator.
+     */
+    private class GroupedInput implements HadoopTaskInput {
+        /** */
+        private final Comparator<Object> grpCmp;
+        /** */
+        private final Input in;
+        /** */
+        private Object prevKey;
+        /** */
+        private Object nextKey;
+        /** */
+        private final GridLongList vals = new GridLongList();
+        /**
+         * @param grpCmp Grouping comparator.
+         * @param in Input.
+         */
+        private GroupedInput(Comparator<Object> grpCmp, Input in) {
+            this.grpCmp = grpCmp;
+   = in;
+        }
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            if (prevKey == null) { // First call.
+                if (!
+                    return false;
+                prevKey = in.key();
+                assert prevKey != null;
+                in.keyReader.resetReusedObject(null); // We need 2 instances 
of key object for comparison.
+                vals.add(value(in.metaPtr));
+            }
+            else {
+                if (in.metaPtr == 0) // We reached the end of the input.
+                    return false;
+                vals.clear();
+                vals.add(value(in.metaPtr));
+                in.keyReader.resetReusedObject(prevKey); // Switch key 
+                prevKey = nextKey;
+            }
+            while ( { // Fill with head value pointers with equal 
+                if (, nextKey = in.key()) == 0)
+                    vals.add(value(in.metaPtr));
+                else
+                    break;
+            }
+            assert !vals.isEmpty();
+            return true;
+        }
+        /** {@inheritDoc} */
+        @Override public Object key() {
+            return prevKey;
+        }
+        /** {@inheritDoc} */
+        @Override public Iterator<?> values() {
+            assert !vals.isEmpty();
+            final ValueIterator valIter = new ValueIterator(vals.get(0), 
+            return new Iterator<Object>() {
+                /** */
+                private int idx;
+                @Override public boolean hasNext() {
+                    if (!valIter.hasNext()) {
+                        if (++idx == vals.size())
+                            return false;
+                        valIter.head(vals.get(idx));
+                        assert valIter.hasNext();
+                    }
+                    return true;
+                }
+                @Override public Object next() {
+                    return;
+                }
+                @Override public void remove() {
+                    valIter.remove();
+                }
+            };
+        }
+        /** {@inheritDoc} */
+        @Override public void close() throws IgniteCheckedException {
+            in.close();
+        }
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..3b5fa15
--- /dev/null
@@ -0,0 +1,171 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop.shuffle.streams;
+import java.nio.charset.StandardCharsets;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+ * Data input stream.
+ */
+public class HadoopDataInStream extends InputStream implements DataInput {
+    /** */
+    private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0);
+    /** */
+    private final GridUnsafeMemory mem;
+    /**
+     * @param mem Memory.
+     */
+    public HadoopDataInStream(GridUnsafeMemory mem) {
+        assert mem != null;
+        this.mem = mem;
+    }
+    /**
+     * @return Buffer.
+     */
+    public HadoopOffheapBuffer buffer() {
+        return buf;
+    }
+    /**
+     * @param size Size.
+     * @return Old pointer.
+     */
+    protected long move(long size) throws IOException {
+        long ptr = buf.move(size);
+        assert ptr != 0;
+        return ptr;
+    }
+    /** {@inheritDoc} */
+    @Override public int read() throws IOException {
+        return readUnsignedByte();
+    }
+    /** {@inheritDoc} */
+    @Override public int read(byte[] b, int off, int len) throws IOException {
+        readFully(b, off, len);
+        return len;
+    }
+    /** {@inheritDoc} */
+    @Override public long skip(long n) throws IOException {
+        move(n);
+        return n;
+    }
+    /** {@inheritDoc} */
+    @Override public void readFully(byte[] b) throws IOException {
+        readFully(b, 0, b.length);
+    }
+    /** {@inheritDoc} */
+    @Override public void readFully(byte[] b, int off, int len) throws 
IOException {
+        mem.readBytes(move(len), b, off, len);
+    }
+    /** {@inheritDoc} */
+    @Override public int skipBytes(int n) throws IOException {
+        move(n);
+        return n;
+    }
+    /** {@inheritDoc} */
+    @Override public boolean readBoolean() throws IOException {
+        byte res = readByte();
+        if (res == 1)
+            return true;
+        assert res == 0 : res;
+        return false;
+    }
+    /** {@inheritDoc} */
+    @Override public byte readByte() throws IOException {
+        return mem.readByte(move(1));
+    }
+    /** {@inheritDoc} */
+    @Override public int readUnsignedByte() throws IOException {
+        return readByte() & 0xff;
+    }
+    /** {@inheritDoc} */
+    @Override public short readShort() throws IOException {
+        return mem.readShort(move(2));
+    }
+    /** {@inheritDoc} */
+    @Override public int readUnsignedShort() throws IOException {
+        return readShort() & 0xffff;
+    }
+    /** {@inheritDoc} */
+    @Override public char readChar() throws IOException {
+        return (char)readShort();
+    }
+    /** {@inheritDoc} */
+    @Override public int readInt() throws IOException {
+        return mem.readInt(move(4));
+    }
+    /** {@inheritDoc} */
+    @Override public long readLong() throws IOException {
+        return mem.readLong(move(8));
+    }
+    /** {@inheritDoc} */
+    @Override public float readFloat() throws IOException {
+        return mem.readFloat(move(4));
+    }
+    /** {@inheritDoc} */
+    @Override public double readDouble() throws IOException {
+        return mem.readDouble(move(8));
+    }
+    /** {@inheritDoc} */
+    @Override public String readLine() throws IOException {
+        throw new UnsupportedOperationException();
+    }
+    /** {@inheritDoc} */
+    @Override public String readUTF() throws IOException {
+        byte[] bytes = new byte[readInt()];
+        if (bytes.length != 0)
+            readFully(bytes);
+        return new String(bytes, StandardCharsets.UTF_8);
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..f7b1a73
--- /dev/null
@@ -0,0 +1,130 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop.shuffle.streams;
+import java.nio.charset.StandardCharsets;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+ * Data output stream.
+ */
+public class HadoopDataOutStream extends OutputStream implements DataOutput {
+    /** */
+    private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0);
+    /** */
+    private final GridUnsafeMemory mem;
+    /**
+     * @param mem Memory.
+     */
+    public HadoopDataOutStream(GridUnsafeMemory mem) {
+        this.mem = mem;
+    }
+    /**
+     * @return Buffer.
+     */
+    public HadoopOffheapBuffer buffer() {
+        return buf;
+    }
+    /**
+     * @param size Size.
+     * @return Old pointer or {@code 0} if move was impossible.
+     */
+    public long move(long size) {
+        return buf.move(size);
+    }
+    /** {@inheritDoc} */
+    @Override public void write(int b) {
+        writeByte(b);
+    }
+    /** {@inheritDoc} */
+    @Override public void write(byte[] b) {
+        write(b, 0, b.length);
+    }
+    /** {@inheritDoc} */
+    @Override public void write(byte[] b, int off, int len) {
+        GridUnsafe.copyMemory(b, GridUnsafe.BYTE_ARR_OFF + off, null, 
move(len), len);
+    }
+    /** {@inheritDoc} */
+    @Override public void writeBoolean(boolean v) {
+        writeByte(v ? 1 : 0);
+    }
+    /** {@inheritDoc} */
+    @Override public void writeByte(int v) {
+        mem.writeByte(move(1), (byte)v);
+    }
+    /** {@inheritDoc} */
+    @Override public void writeShort(int v) {
+        mem.writeShort(move(2), (short)v);
+    }
+    /** {@inheritDoc} */
+    @Override public void writeChar(int v) {
+        writeShort(v);
+    }
+    /** {@inheritDoc} */
+    @Override public void writeInt(int v) {
+        mem.writeInt(move(4), v);
+    }
+    /** {@inheritDoc} */
+    @Override public void writeLong(long v) {
+        mem.writeLong(move(8), v);
+    }
+    /** {@inheritDoc} */
+    @Override public void writeFloat(float v) {
+        mem.writeFloat(move(4), v);
+    }
+    /** {@inheritDoc} */
+    @Override public void writeDouble(double v) {
+        mem.writeDouble(move(8), v);
+    }
+    /** {@inheritDoc} */
+    @Override public void writeBytes(String s) {
+        writeUTF(s);
+    }
+    /** {@inheritDoc} */
+    @Override public void writeChars(String s) {
+        writeUTF(s);
+    }
+    /** {@inheritDoc} */
+    @Override public void writeUTF(String s) {
+        byte[] b = s.getBytes(StandardCharsets.UTF_8);
+        writeInt(b.length);
+        write(b);
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..acc9be6
--- /dev/null
@@ -0,0 +1,122 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop.shuffle.streams;
+ * Offheap buffer.
+ */
+public class HadoopOffheapBuffer {
+    /** Buffer begin address. */
+    private long bufPtr;
+    /** The first address we do not own. */
+    private long bufEnd;
+    /** Current read or write pointer. */
+    private long posPtr;
+    /**
+     * @param bufPtr Pointer to buffer begin.
+     * @param bufSize Size of the buffer.
+     */
+    public HadoopOffheapBuffer(long bufPtr, long bufSize) {
+        set(bufPtr, bufSize);
+    }
+    /**
+     * @param bufPtr Pointer to buffer begin.
+     * @param bufSize Size of the buffer.
+     */
+    public void set(long bufPtr, long bufSize) {
+        this.bufPtr = bufPtr;
+        posPtr = bufPtr;
+        bufEnd = bufPtr + bufSize;
+    }
+    /**
+     * @return Pointer to internal buffer begin.
+     */
+    public long begin() {
+        return bufPtr;
+    }
+    /**
+     * @return Buffer capacity.
+     */
+    public long capacity() {
+        return bufEnd - bufPtr;
+    }
+    /**
+     * @return Remaining capacity.
+     */
+    public long remaining() {
+        return bufEnd - posPtr;
+    }
+    /**
+     * @return Absolute pointer to the current position inside of the buffer.
+     */
+    public long pointer() {
+        return posPtr;
+    }
+    /**
+     * @param ptr Absolute pointer to the current position inside of the 
+     */
+    public void pointer(long ptr) {
+        assert ptr >= bufPtr : bufPtr + " <= " + ptr;
+        assert ptr <= bufEnd : bufEnd + " <= " + bufPtr;
+        posPtr = ptr;
+    }
+    /**
+     * @param size Size move on.
+     * @return Old position pointer or {@code 0} if move goes beyond the end 
of the buffer.
+     */
+    public long move(long size) {
+        assert size > 0 : size;
+        long oldPos = posPtr;
+        long newPos = oldPos + size;
+        if (newPos > bufEnd)
+            return 0;
+        posPtr = newPos;
+        return oldPos;
+    }
+    /**
+     * @param ptr Pointer.
+     * @return {@code true} If the given pointer is inside of this buffer.
+     */
+    public boolean isInside(long ptr) {
+        return ptr >= bufPtr && ptr <= bufEnd;
+    }
+    /**
+     * Resets position to the beginning of buffer.
+     */
+    public void reset() {
+        posPtr = bufPtr;
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..5ede18e
--- /dev/null
@@ -0,0 +1,153 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.internal.U;
+ * Task executor.
+ */
+public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter {
+    /** Job tracker. */
+    private HadoopJobTracker jobTracker;
+    /** */
+    private final ConcurrentMap<HadoopJobId, Collection<HadoopRunnableTask>> 
jobs = new ConcurrentHashMap<>();
+    /** Executor service to run tasks. */
+    private HadoopExecutorService exec;
+    /** {@inheritDoc} */
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
+        jobTracker = ctx.jobTracker();
+        exec = new HadoopExecutorService(log, ctx.kernalContext().gridName(),
+            ctx.configuration().getMaxParallelTasks(), 
+    }
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        if (exec != null) {
+            exec.shutdown(3000);
+            if (cancel) {
+                for (HadoopJobId jobId : jobs.keySet())
+                    cancelTasks(jobId);
+            }
+        }
+    }
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        if (exec != null && !exec.shutdown(30000))
+            U.warn(log, "Failed to finish running tasks in 30 sec.");
+    }
+    /** {@inheritDoc} */
+    @Override public void run(final HadoopJob job, Collection<HadoopTaskInfo> 
tasks) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Submitting tasks for local execution [locNodeId=" + 
ctx.localNodeId() +
+                ", tasksCnt=" + tasks.size() + ']');
+        Collection<HadoopRunnableTask> executedTasks = jobs.get(;
+        if (executedTasks == null) {
+            executedTasks = new GridConcurrentHashSet<>();
+            Collection<HadoopRunnableTask> extractedCol = jobs.put(, 
+            assert extractedCol == null;
+        }
+        final Collection<HadoopRunnableTask> finalExecutedTasks = 
+        for (final HadoopTaskInfo info : tasks) {
+            assert info != null;
+            HadoopRunnableTask task = new HadoopRunnableTask(log, job, 
ctx.shuffle().memory(), info,
+                ctx.localNodeId()) {
+                @Override protected void onTaskFinished(HadoopTaskStatus 
status) {
+                    if (log.isDebugEnabled())
+                        log.debug("Finished task execution [jobId=" + 
+ ", taskInfo=" + info + ", " +
+                            "waitTime=" + waitTime() + ", execTime=" + 
executionTime() + ']');
+                    finalExecutedTasks.remove(this);
+                    jobTracker.onTaskFinished(info, status);
+                }
+                @Override protected HadoopTaskInput 
createInput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+                    return ctx.shuffle().input(taskCtx);
+                }
+                @Override protected HadoopTaskOutput 
createOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+                    return ctx.shuffle().output(taskCtx);
+                }
+            };
+            executedTasks.add(task);
+            exec.submit(task);
+        }
+    }
+    /**
+     * Cancels all currently running tasks for given job ID and cancels 
scheduled execution of tasks
+     * for this job ID.
+     * <p>
+     * It is guaranteed that this method will not be called concurrently with
+     * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, 
Collection)} method. No more job submissions will be performed via
+     * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, 
Collection)} method for given job ID after this method is called.
+     *
+     * @param jobId Job ID to cancel.
+     */
+    @Override public void cancelTasks(HadoopJobId jobId) {
+        Collection<HadoopRunnableTask> executedTasks = jobs.get(jobId);
+        if (executedTasks != null) {
+            for (HadoopRunnableTask task : executedTasks)
+                task.cancel();
+        }
+    }
+    /** {@inheritDoc} */
+    @Override public void onJobStateChanged(HadoopJobMetadata meta) throws 
IgniteCheckedException {
+        if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) {
+            Collection<HadoopRunnableTask> executedTasks = 
+            assert executedTasks == null || executedTasks.isEmpty();
+        }
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..993ecc9
--- /dev/null
@@ -0,0 +1,234 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.internal.util.worker.GridWorkerListener;
+import org.apache.ignite.internal.util.worker.GridWorkerListenerAdapter;
+import org.apache.ignite.thread.IgniteThread;
+import org.jsr166.ConcurrentHashMap8;
+import static java.util.Collections.newSetFromMap;
+ * Executor service without thread pooling.
+ */
+public class HadoopExecutorService {
+    /** */
+    private final LinkedBlockingQueue<Callable<?>> queue;
+    /** */
+    private final Collection<GridWorker> workers = newSetFromMap(new 
ConcurrentHashMap8<GridWorker, Boolean>());
+    /** */
+    private final AtomicInteger active = new AtomicInteger();
+    /** */
+    private final int maxTasks;
+    /** */
+    private final String gridName;
+    /** */
+    private final IgniteLogger log;
+    /** */
+    private volatile boolean shutdown;
+    /** */
+    private final GridWorkerListener lsnr = new GridWorkerListenerAdapter() {
+            @Override public void onStopped(GridWorker w) {
+                workers.remove(w);
+                if (shutdown) {
+                    active.decrementAndGet();
+                    return;
+                }
+                Callable<?> task = queue.poll();
+                if (task != null)
+                    startThread(task);
+                else {
+                    active.decrementAndGet();
+                    if (!queue.isEmpty())
+                        startFromQueue();
+                }
+            }
+        };
+    /**
+     * @param log Logger.
+     * @param gridName Grid name.
+     * @param maxTasks Max number of tasks.
+     * @param maxQueue Max queue length.
+     */
+    public HadoopExecutorService(IgniteLogger log, String gridName, int 
maxTasks, int maxQueue) {
+        assert maxTasks > 0 : maxTasks;
+        assert maxQueue > 0 : maxQueue;
+        this.maxTasks = maxTasks;
+        this.queue = new LinkedBlockingQueue<>(maxQueue);
+        this.gridName = gridName;
+        this.log = log.getLogger(HadoopExecutorService.class);
+    }
+    /**
+     * @return Number of active workers.
+     */
+    public int active() {
+        return workers.size();
+    }
+    /**
+     * Submit task.
+     *
+     * @param task Task.
+     */
+    public void submit(Callable<?> task) {
+        while (queue.isEmpty()) {
+            int active0 = active.get();
+            if (active0 == maxTasks)
+                break;
+            if (active.compareAndSet(active0, active0 + 1)) {
+                startThread(task);
+                return; // Started in new thread bypassing queue.
+            }
+        }
+        try {
+            while (!queue.offer(task, 100, TimeUnit.MILLISECONDS)) {
+                if (shutdown)
+                    return; // Rejected due to shutdown.
+            }
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return;
+        }
+        startFromQueue();
+    }
+    /**
+     * Attempts to start task from queue.
+     */
+    private void startFromQueue() {
+        do {
+            int active0 = active.get();
+            if (active0 == maxTasks)
+                break;
+            if (active.compareAndSet(active0, active0 + 1)) {
+                Callable<?> task = queue.poll();
+                if (task == null) {
+                    int res = active.decrementAndGet();
+                    assert res >= 0 : res;
+                    break;
+                }
+                startThread(task);
+            }
+        }
+        while (!queue.isEmpty());
+    }
+    /**
+     * @param task Task.
+     */
+    private void startThread(final Callable<?> task) {
+        String workerName;
+        if (task instanceof HadoopRunnableTask) {
+            final HadoopTaskInfo i = ((HadoopRunnableTask)task).taskInfo();
+            workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + 
i.taskNumber() + "-" + i.attempt();
+        }
+        else
+            workerName = task.toString();
+        GridWorker w = new GridWorker(gridName, workerName, log, lsnr) {
+            @Override protected void body() {
+                try {
+          ;
+                }
+                catch (Exception e) {
+                    log.error("Failed to execute task: " + task, e);
+                }
+            }
+        };
+        workers.add(w);
+        if (shutdown)
+            w.cancel();
+        new IgniteThread(w).start();
+    }
+    /**
+     * Shuts down this executor service.
+     *
+     * @param awaitTimeMillis Time in milliseconds to wait for tasks 
+     * @return {@code true} If all tasks completed.
+     */
+    public boolean shutdown(long awaitTimeMillis) {
+        shutdown = true;
+        for (GridWorker w : workers)
+            w.cancel();
+        while (awaitTimeMillis > 0 && !workers.isEmpty()) {
+            try {
+                Thread.sleep(100);
+                awaitTimeMillis -= 100;
+            }
+            catch (InterruptedException e) {
+                break;
+            }
+        }
+        return workers.isEmpty();
+    }
+    /**
+     * @return {@code true} If method {@linkplain #shutdown(long)} was already 
+     */
+    public boolean isShutdown() {
+        return shutdown;
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..a57efe6
--- /dev/null
@@ -0,0 +1,293 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import static 
+import static 
+import static 
+import static 
+import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.MAP;
+ * Runnable task.
+ */
+public abstract class HadoopRunnableTask implements Callable<Void> {
+    /** */
+    private final GridUnsafeMemory mem;
+    /** */
+    private final IgniteLogger log;
+    /** */
+    private final HadoopJob job;
+    /** Task to run. */
+    private final HadoopTaskInfo info;
+    /** Submit time. */
+    private final long submitTs = U.currentTimeMillis();
+    /** Execution start timestamp. */
+    private long execStartTs;
+    /** Execution end timestamp. */
+    private long execEndTs;
+    /** */
+    private HadoopMultimap combinerInput;
+    /** */
+    private volatile HadoopTaskContext ctx;
+    /** Set if task is to cancelling. */
+    private volatile boolean cancelled;
+    /** Node id. */
+    private UUID nodeId;
+    /**
+     * @param log Log.
+     * @param job Job.
+     * @param mem Memory.
+     * @param info Task info.
+     * @param nodeId Node id.
+     */
+    protected HadoopRunnableTask(IgniteLogger log, HadoopJob job, 
GridUnsafeMemory mem, HadoopTaskInfo info,
+        UUID nodeId) {
+        this.nodeId = nodeId;
+        this.log = log.getLogger(HadoopRunnableTask.class);
+        this.job = job;
+        this.mem = mem;
+ = info;
+    }
+    /**
+     * @return Wait time.
+     */
+    public long waitTime() {
+        return execStartTs - submitTs;
+    }
+    /**
+     * @return Execution time.
+     */
+    public long executionTime() {
+        return execEndTs - execStartTs;
+    }
+    /** {@inheritDoc} */
+    @Override public Void call() throws IgniteCheckedException {
+        ctx = job.getTaskContext(info);
+        return ctx.runAsJobOwner(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                call0();
+                return null;
+            }
+        });
+    }
+    /**
+     * Implements actual task running.
+     * @throws IgniteCheckedException
+     */
+    void call0() throws IgniteCheckedException {
+        execStartTs = U.currentTimeMillis();
+        Throwable err = null;
+        HadoopTaskState state = HadoopTaskState.COMPLETED;
+        HadoopPerformanceCounter perfCntr = null;
+        try {
+            perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), 
+            perfCntr.onTaskSubmit(info, submitTs);
+            perfCntr.onTaskPrepare(info, execStartTs);
+            ctx.prepareTaskEnvironment();
+            runTask(perfCntr);
+            if (info.type() == MAP && {
+                ctx.taskInfo(new HadoopTaskInfo(COMBINE, info.jobId(), 
info.taskNumber(), info.attempt(), null));
+                try {
+                    runTask(perfCntr);
+                }
+                finally {
+                    ctx.taskInfo(info);
+                }
+            }
+        }
+        catch (HadoopTaskCancelledException ignored) {
+            state = HadoopTaskState.CANCELED;
+        }
+        catch (Throwable e) {
+            state = HadoopTaskState.FAILED;
+            err = e;
+            U.error(log, "Task execution failed.", e);
+            if (e instanceof Error)
+                throw e;
+        }
+        finally {
+            execEndTs = U.currentTimeMillis();
+            if (perfCntr != null)
+                perfCntr.onTaskFinish(info, execEndTs);
+            onTaskFinished(new HadoopTaskStatus(state, err, ctx==null ? null : 
+            if (combinerInput != null)
+                combinerInput.close();
+            if (ctx != null)
+                ctx.cleanupTaskEnvironment();
+        }
+    }
+    /**
+     * @param perfCntr Performance counter.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void runTask(HadoopPerformanceCounter perfCntr) throws 
IgniteCheckedException {
+        if (cancelled)
+            throw new HadoopTaskCancelledException("Task cancelled.");
+        try (HadoopTaskOutput out = createOutputInternal(ctx);
+             HadoopTaskInput in = createInputInternal(ctx)) {
+            ctx.input(in);
+            ctx.output(out);
+            perfCntr.onTaskStart(ctx.taskInfo(), U.currentTimeMillis());
+  ;
+        }
+    }
+    /**
+     * Cancel the executed task.
+     */
+    public void cancel() {
+        cancelled = true;
+        if (ctx != null)
+            ctx.cancel();
+    }
+    /**
+     * @param status Task status.
+     */
+    protected abstract void onTaskFinished(HadoopTaskStatus status);
+    /**
+     * @param ctx Task context.
+     * @return Task input.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private HadoopTaskInput createInputInternal(HadoopTaskContext ctx) throws 
IgniteCheckedException {
+        switch (ctx.taskInfo().type()) {
+            case SETUP:
+            case MAP:
+            case COMMIT:
+            case ABORT:
+                return null;
+            case COMBINE:
+                assert combinerInput != null;
+                return combinerInput.input(ctx);
+            default:
+                return createInput(ctx);
+        }
+    }
+    /**
+     * @param ctx Task context.
+     * @return Input.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected abstract HadoopTaskInput createInput(HadoopTaskContext ctx) 
throws IgniteCheckedException;
+    /**
+     * @param ctx Task info.
+     * @return Output.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected abstract HadoopTaskOutput createOutput(HadoopTaskContext ctx) 
throws IgniteCheckedException;
+    /**
+     * @param ctx Task info.
+     * @return Task output.
+     * @throws IgniteCheckedException If failed.
+     */
+    private HadoopTaskOutput createOutputInternal(HadoopTaskContext ctx) 
throws IgniteCheckedException {
+        switch (ctx.taskInfo().type()) {
+            case SETUP:
+            case REDUCE:
+            case COMMIT:
+            case ABORT:
+                return null;
+            case MAP:
+                if ( {
+                    assert combinerInput == null;
+                    combinerInput = get(, 
+                        new HadoopHashMultimap(, mem, 
get(, COMBINER_HASHMAP_SIZE, 8 * 1024)):
+                        new HadoopSkipList(, mem); // TODO replace 
with red-black tree
+                    return combinerInput.startAdding(ctx);
+                }
+            default:
+                return createOutput(ctx);
+        }
+    }
+    /**
+     * @return Task info.
+     */
+    public HadoopTaskInfo taskInfo() {
+        return info;
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..f13c76a
--- /dev/null
@@ -0,0 +1,59 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+import java.util.Collection;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+ * Common superclass for task executor.
+ */
+public abstract class HadoopTaskExecutorAdapter extends HadoopComponent {
+    /**
+     * Runs tasks.
+     *
+     * @param job Job.
+     * @param tasks Tasks.
+     * @throws IgniteCheckedException If failed.
+     */
+    public abstract void run(final HadoopJob job, Collection<HadoopTaskInfo> 
tasks) throws IgniteCheckedException;
+    /**
+     * Cancels all currently running tasks for given job ID and cancels 
scheduled execution of tasks
+     * for this job ID.
+     * <p>
+     * It is guaranteed that this method will not be called concurrently with
+     * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, 
Collection)} method. No more job submissions will be performed via
+     * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, 
Collection)} method for given job ID after this method is called.
+     *
+     * @param jobId Job ID to cancel.
+     */
+    public abstract void cancelTasks(HadoopJobId jobId) throws 
+    /**
+     * On job state change callback;
+     *
+     * @param meta Job metadata.
+     */
+    public abstract void onJobStateChanged(HadoopJobMetadata meta) throws 
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..fa09ff7
--- /dev/null
@@ -0,0 +1,116 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+ * Task status.
+ */
+public class HadoopTaskStatus implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+    /** */
+    private HadoopTaskState state;
+    /** */
+    private Throwable failCause;
+    /** */
+    private HadoopCounters cntrs;
+    /**
+     * Default constructor required by {@link Externalizable}.
+     */
+    public HadoopTaskStatus() {
+        // No-op.
+    }
+    /**
+     * Creates new instance.
+     *
+     * @param state Task state.
+     * @param failCause Failure cause (if any).
+     */
+    public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable 
failCause) {
+        this(state, failCause, null);
+    }
+    /**
+     * Creates new instance.
+     *
+     * @param state Task state.
+     * @param failCause Failure cause (if any).
+     * @param cntrs Task counters.
+     */
+    public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable 
+        @Nullable HadoopCounters cntrs) {
+        assert state != null;
+        this.state = state;
+        this.failCause = failCause;
+        this.cntrs = cntrs;
+    }
+    /**
+     * @return State.
+     */
+    public HadoopTaskState state() {
+        return state;
+    }
+    /**
+     * @return Fail cause.
+     */
+    @Nullable public Throwable failCause() {
+        return failCause;
+    }
+    /**
+     * @return Counters.
+     */
+    @Nullable public HadoopCounters counters() {
+        return cntrs;
+    }
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopTaskStatus.class, this);
+    }
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(state);
+        out.writeObject(failCause);
+        out.writeObject(cntrs);
+    }
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        state = (HadoopTaskState)in.readObject();
+        failCause = (Throwable)in.readObject();
+        cntrs = (HadoopCounters)in.readObject();
+    }
\ No newline at end of file

Reply via email to