http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
new file mode 100644
index 0000000..820f47a
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.eviction.fifo.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.gridgain.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+import static org.apache.ignite.events.IgniteEventType.*;
+
+/**
+ *
+ */
+public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static ConcurrentHashMap<Object, Object> storeMap;
+
+    /** */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private GridCacheMode mode = PARTITIONED;
+
+    /** */
+    private boolean nearEnabled = true;
+
+    /** */
+    private boolean useCache;
+
+    /** */
+    private boolean useGrpLock;
+
+    /** */
+    private TestStore store;
+
+    /** {@inheritDoc} */
+    @Override public void afterTest() throws Exception {
+        super.afterTest();
+
+        useCache = false;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"IfMayBeConditional", "unchecked"})
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(spi);
+
+        cfg.setIncludeProperties();
+
+        cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
+
+        if (useCache) {
+            CacheConfiguration cc = defaultCacheConfiguration();
+
+            cc.setCacheMode(mode);
+            cc.setAtomicityMode(TRANSACTIONAL);
+            cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : 
PARTITIONED_ONLY);
+            cc.setWriteSynchronizationMode(FULL_SYNC);
+
+            cc.setEvictionPolicy(new GridCacheFifoEvictionPolicy(10000));
+
+            cc.setEvictSynchronized(false);
+            cc.setEvictNearSynchronized(false);
+
+            if (store != null) {
+                cc.setCacheStoreFactory(new 
FactoryBuilder.SingletonFactory(store));
+                cc.setReadThrough(true);
+                cc.setWriteThrough(true);
+            }
+
+            cfg.setCacheConfiguration(cc);
+        }
+        else
+            cfg.setCacheConfiguration();
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitioned() throws Exception {
+        mode = PARTITIONED;
+
+        checkDataLoader();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testColocated() throws Exception {
+        mode = PARTITIONED;
+        nearEnabled = false;
+
+        checkDataLoader();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionedGroupLock() throws Exception {
+        mode = PARTITIONED;
+        useGrpLock = true;
+
+        checkDataLoader();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicated() throws Exception {
+        mode = REPLICATED;
+
+        checkDataLoader();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedGroupLock() throws Exception {
+        mode = REPLICATED;
+        useGrpLock = true;
+
+        checkDataLoader();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocal() throws Exception {
+        mode = LOCAL;
+
+        try {
+            checkDataLoader();
+
+            assert false;
+        }
+        catch (IgniteCheckedException e) {
+            // Cannot load local cache configured remotely.
+            info("Caught expected exception: " + e);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ErrorNotRethrown")
+    private void checkDataLoader() throws Exception {
+        try {
+            Ignite g1 = startGrid(1);
+
+            useCache = true;
+
+            Ignite g2 = startGrid(2);
+            Ignite g3 = startGrid(3);
+
+            final IgniteDataLoader<Integer, Integer> ldr = g1.dataLoader(null);
+
+            ldr.updater(useGrpLock ? GridDataLoadCacheUpdaters.<Integer, 
Integer>groupLocked() :
+                GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted());
+
+            final AtomicInteger idxGen = new AtomicInteger();
+            final int cnt = 400;
+            final int threads = 10;
+
+            final CountDownLatch l1 = new CountDownLatch(threads);
+
+            IgniteFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
+
+                    for (int i = 0; i < cnt; i++) {
+                        int idx = idxGen.getAndIncrement();
+
+                        futs.add(ldr.addData(idx, idx));
+                    }
+
+                    l1.countDown();
+
+                    for (IgniteFuture<?> fut : futs)
+                        fut.get();
+
+                    return null;
+                }
+            }, threads);
+
+            l1.await();
+
+            // This will wait until data loader finishes loading.
+            stopGrid(getTestGridName(1), false);
+
+            f1.get();
+
+            int s2 = g2.cache(null).primaryKeySet().size();
+            int s3 = g3.cache(null).primaryKeySet().size();
+            int total = threads * cnt;
+
+            assertEquals(total, s2 + s3);
+
+            final IgniteDataLoader<Integer, Integer> rmvLdr = 
g2.dataLoader(null);
+
+            rmvLdr.updater(useGrpLock ? GridDataLoadCacheUpdaters.<Integer, 
Integer>groupLocked() :
+                GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted());
+
+            final CountDownLatch l2 = new CountDownLatch(threads);
+
+            IgniteFuture<?> f2 = multithreadedAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
+
+                    for (int i = 0; i < cnt; i++) {
+                        final int key = idxGen.decrementAndGet();
+
+                        futs.add(rmvLdr.removeData(key));
+                    }
+
+                    l2.countDown();
+
+                    for (IgniteFuture<?> fut : futs)
+                        fut.get();
+
+                    return null;
+                }
+            }, threads);
+
+            l2.await();
+
+            rmvLdr.close(false);
+
+            f2.get();
+
+            s2 = g2.cache(null).primaryKeySet().size();
+            s3 = g3.cache(null).primaryKeySet().size();
+
+            assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + 
", s3=" + s3 + ']';
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Test primitive arrays can be passed into data loader.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPrimitiveArrays() throws Exception {
+        try {
+            useCache = true;
+            mode = PARTITIONED;
+
+            Ignite g1 = startGrid(1);
+            startGrid(2); // Reproduced only for several nodes in topology (if 
marshalling is used).
+
+            List<Object> arrays = Arrays.<Object>asList(
+                new byte[] {1}, new boolean[] {true, false}, new char[] {2, 
3}, new short[] {3, 4},
+                new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new 
double[] {7, 8});
+
+            IgniteDataLoader<Object, Object> dataLdr = g1.dataLoader(null);
+
+            for (int i = 0, size = arrays.size(); i < 1000; i++) {
+                Object arr = arrays.get(i % size);
+
+                dataLdr.addData(i, arr);
+                dataLdr.addData(i, fixedClosure(arr));
+            }
+
+            dataLdr.close(false);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedMultiThreaded() throws Exception {
+        mode = REPLICATED;
+
+        checkLoaderMultithreaded(1, 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedMultiThreadedGroupLock() throws Exception {
+        mode = REPLICATED;
+        useGrpLock = true;
+
+        checkLoaderMultithreaded(1, 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionedMultiThreaded() throws Exception {
+        mode = PARTITIONED;
+
+        checkLoaderMultithreaded(1, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionedMultiThreadedGroupLock() throws Exception {
+        mode = PARTITIONED;
+        useGrpLock = true;
+
+        checkLoaderMultithreaded(1, 3);
+    }
+
+    /**
+     * Tests loader in multithreaded environment with various count of grids 
started.
+     *
+     * @param nodesCntNoCache How many nodes should be started without cache.
+     * @param nodesCntCache How many nodes should be started with cache.
+     * @throws Exception If failed.
+     */
+    protected void checkLoaderMultithreaded(int nodesCntNoCache, int 
nodesCntCache)
+        throws Exception {
+        try {
+            // Start all required nodes.
+            int idx = 1;
+
+            for (int i = 0; i < nodesCntNoCache; i++)
+                startGrid(idx++);
+
+            useCache = true;
+
+            for (int i = 0; i < nodesCntCache; i++)
+                startGrid(idx++);
+
+            Ignite g1 = grid(1);
+
+            // Get and configure loader.
+            final IgniteDataLoader<Integer, Integer> ldr = g1.dataLoader(null);
+
+            ldr.updater(useGrpLock ? GridDataLoadCacheUpdaters.<Integer, 
Integer>groupLocked() :
+                GridDataLoadCacheUpdaters.<Integer, Integer>individual());
+            ldr.perNodeBufferSize(2);
+
+            // Define count of puts.
+            final AtomicInteger idxGen = new AtomicInteger();
+
+            final AtomicBoolean done = new AtomicBoolean();
+
+            try {
+                final int totalPutCnt = 50000;
+
+                IgniteFuture<?> fut1 = multithreadedAsync(new 
Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        Collection<IgniteFuture<?>> futs = new ArrayList<>();
+
+                        while (!done.get()) {
+                            int idx = idxGen.getAndIncrement();
+
+                            if (idx >= totalPutCnt) {
+                                info(">>> Stopping producer thread since 
maximum count of puts reached.");
+
+                                break;
+                            }
+
+                            futs.add(ldr.addData(idx, idx));
+                        }
+
+                        ldr.flush();
+
+                        for (IgniteFuture<?> fut : futs)
+                            fut.get();
+
+                        return null;
+                    }
+                }, 5, "producer");
+
+                IgniteFuture<?> fut2 = multithreadedAsync(new 
Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        while (!done.get()) {
+                            ldr.flush();
+
+                            U.sleep(100);
+                        }
+
+                        return null;
+                    }
+                }, 1, "flusher");
+
+                // Define index of node being restarted.
+                final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1;
+
+                IgniteFuture<?> fut3 = multithreadedAsync(new 
Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        try {
+                            for (int i = 0; i < 5; i++) {
+                                Ignite g = startGrid(restartNodeIdx);
+
+                                UUID id = g.cluster().localNode().id();
+
+                                info(">>>>>>> Started node: " + id);
+
+                                U.sleep(1000);
+
+                                stopGrid(getTestGridName(restartNodeIdx), 
true);
+
+                                info(">>>>>>> Stopped node: " + id);
+                            }
+                        }
+                        finally {
+                            done.set(true);
+
+                            info("Start stop thread finished.");
+                        }
+
+                        return null;
+                    }
+                }, 1, "start-stop-thread");
+
+                fut1.get();
+                fut2.get();
+                fut3.get();
+            }
+            finally {
+                ldr.close(false);
+            }
+
+            info("Cache size on second grid: " + grid(nodesCntNoCache + 
1).cache(null).primaryKeySet().size());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoaderApi() throws Exception {
+        useCache = true;
+
+        try {
+            Ignite g1 = startGrid(1);
+
+            IgniteDataLoader<Object, Object> ldr = g1.dataLoader(null);
+
+            ldr.close(false);
+
+            try {
+                ldr.addData(0, 0);
+
+                assert false;
+            }
+            catch (IllegalStateException e) {
+                info("Caught expected exception: " + e);
+            }
+
+            assert ldr.future().isDone();
+
+            ldr.future().get();
+
+            try {
+                // Create another loader.
+                ldr = g1.dataLoader("UNKNOWN_CACHE");
+
+                assert false;
+            }
+            catch (IllegalStateException e) {
+                info("Caught expected exception: " + e);
+            }
+
+            ldr.close(true);
+
+            assert ldr.future().isDone();
+
+            ldr.future().get();
+
+            // Create another loader.
+            ldr = g1.dataLoader(null);
+
+            // Cancel with future.
+            ldr.future().cancel();
+
+            try {
+                ldr.addData(0, 0);
+
+                assert false;
+            }
+            catch (IllegalStateException e) {
+                info("Caught expected exception: " + e);
+            }
+
+            assert ldr.future().isDone();
+
+            try {
+                ldr.future().get();
+
+                assert false;
+            }
+            catch (IgniteFutureCancelledException e) {
+                info("Caught expected exception: " + e);
+            }
+
+            // Create another loader.
+            ldr = g1.dataLoader(null);
+
+            // This will close loader.
+            stopGrid(getTestGridName(1), false);
+
+            try {
+                ldr.addData(0, 0);
+
+                assert false;
+            }
+            catch (IllegalStateException e) {
+                info("Caught expected exception: " + e);
+            }
+
+            assert ldr.future().isDone();
+
+            ldr.future().get();
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Wraps integer to closure returning it.
+     *
+     * @param i Value to wrap.
+     * @return Callable.
+     */
+    private static Callable<Integer> callable(@Nullable final Integer i) {
+        return new Callable<Integer>() {
+            @Override public Integer call() throws Exception {
+                return i;
+            }
+        };
+    }
+
+    /**
+     * Wraps integer to closure returning it.
+     *
+     * @param i Value to wrap.
+     * @return Closure.
+     */
+    private static IgniteClosure<Integer, Integer> closure(@Nullable final 
Integer i) {
+        return new IgniteClosure<Integer, Integer>() {
+            @Override public Integer apply(Integer e) {
+                return e == null ? i : e + i;
+            }
+        };
+    }
+
+    /**
+     * Wraps object to closure returning it.
+     *
+     * @param obj Value to wrap.
+     * @return Closure.
+     */
+    private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T obj) 
{
+        return new IgniteClosure<T, T>() {
+            @Override public T apply(T e) {
+                assert e == null || obj == null || e.getClass() == 
obj.getClass() :
+                    "Expects the same types [e=" + e + ", obj=" + obj + ']';
+
+                return obj;
+            }
+        };
+    }
+
+    /**
+     * Wraps integer to closure expecting it and returning {@code null}.
+     *
+     * @param exp Expected closure value.
+     * @return Remove expected cache value closure.
+     */
+    private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T 
exp) {
+        return new IgniteClosure<T, T>() {
+            @Override public T apply(T act) {
+                if (exp == null ? act == null : exp.equals(act))
+                    return null;
+
+                throw new AssertionError("Unexpected value [exp=" + exp + ", 
act=" + act + ']');
+            }
+        };
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFlush() throws Exception {
+        mode = LOCAL;
+
+        useCache = true;
+
+        try {
+            Ignite g = startGrid();
+
+            final GridCache<Integer, Integer> c = g.cache(null);
+
+            final IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null);
+
+            ldr.perNodeBufferSize(10);
+
+            for (int i = 0; i < 9; i++)
+                ldr.addData(i, i);
+
+            assertTrue(c.isEmpty());
+
+            multithreaded(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    ldr.flush();
+
+                    assertEquals(9, c.size());
+
+                    return null;
+                }
+            }, 5, "flush-checker");
+
+            ldr.addData(100, 100);
+
+            ldr.flush();
+
+            assertEquals(10, c.size());
+
+            ldr.addData(200, 200);
+
+            ldr.close(false);
+
+            ldr.future().get();
+
+            assertEquals(11, c.size());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTryFlush() throws Exception {
+        mode = LOCAL;
+
+        useCache = true;
+
+        try {
+            Ignite g = startGrid();
+
+            GridCache<Integer, Integer> c = g.cache(null);
+
+            IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null);
+
+            ldr.perNodeBufferSize(10);
+
+            for (int i = 0; i < 9; i++)
+                ldr.addData(i, i);
+
+            assertTrue(c.isEmpty());
+
+            ldr.tryFlush();
+
+            Thread.sleep(100);
+
+            assertEquals(9, c.size());
+
+            ldr.close(false);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFlushTimeout() throws Exception {
+        mode = LOCAL;
+
+        useCache = true;
+
+        try {
+            Ignite g = startGrid();
+
+            final CountDownLatch latch = new CountDownLatch(9);
+
+            g.events().localListen(new IgnitePredicate<IgniteEvent>() {
+                @Override public boolean apply(IgniteEvent evt) {
+                    latch.countDown();
+
+                    return true;
+                }
+            }, EVT_CACHE_OBJECT_PUT);
+
+            GridCache<Integer, Integer> c = g.cache(null);
+
+            assertTrue(c.isEmpty());
+
+            IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null);
+
+            ldr.perNodeBufferSize(10);
+            ldr.autoFlushFrequency(3000);
+
+            for (int i = 0; i < 9; i++)
+                ldr.addData(i, i);
+
+            assertTrue(c.isEmpty());
+
+            assertFalse(latch.await(1000, MILLISECONDS));
+
+            assertTrue(c.isEmpty());
+
+            assertTrue(latch.await(3000, MILLISECONDS));
+
+            assertEquals(9, c.size());
+
+            ldr.close(false);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateStore() throws Exception {
+        storeMap = new ConcurrentHashMap<>();
+
+        try {
+            store = new TestStore();
+
+            useCache = true;
+
+            Ignite ignite = startGrid(1);
+
+            startGrid(2);
+            startGrid(3);
+
+            for (int i = 0; i < 1000; i++)
+                storeMap.put(i, i);
+
+            try (IgniteDataLoader<Object, Object> ldr = 
ignite.dataLoader(null)) {
+                assertFalse(ldr.skipStore());
+
+                for (int i = 0; i < 1000; i++)
+                    ldr.removeData(i);
+
+                for (int i = 1000; i < 2000; i++)
+                    ldr.addData(i, i);
+            }
+
+            for (int i = 0; i < 1000; i++)
+                assertNull(storeMap.get(i));
+
+            for (int i = 1000; i < 2000; i++)
+                assertEquals(i, storeMap.get(i));
+
+            try (IgniteDataLoader<Object, Object> ldr = 
ignite.dataLoader(null)) {
+                ldr.skipStore(true);
+
+                for (int i = 0; i < 1000; i++)
+                    ldr.addData(i, i);
+
+                for (int i = 1000; i < 2000; i++)
+                    ldr.removeData(i);
+            }
+
+            IgniteCache<Object, Object> cache = ignite.jcache(null);
+
+            for (int i = 0; i < 1000; i++) {
+                assertNull(storeMap.get(i));
+
+                assertEquals(i, cache.get(i));
+            }
+
+            for (int i = 1000; i < 2000; i++) {
+                assertEquals(i, storeMap.get(i));
+
+                assertNull(cache.localPeek(i));
+            }
+        }
+        finally {
+            storeMap = null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestObject {
+        /** Value. */
+        private final int val;
+
+        /**
+         * @param val Value.
+         */
+        private TestObject(int val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestObject obj = (TestObject)o;
+
+            return val == obj.val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return val;
+        }
+    }
+
+    /**
+     *
+     */
+    private class TestStore extends CacheStoreAdapter<Object, Object> {
+        /** {@inheritDoc} */
+        @Nullable @Override public Object load(Object key) {
+            return storeMap.get(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<?, ?> entry) {
+            storeMap.put(entry.getKey(), entry.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) {
+            storeMap.remove(key);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java
index 548f8e1..b09c1cf 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java
@@ -24,7 +24,7 @@ import org.apache.ignite.configuration.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.managed.*;
 import org.apache.ignite.resources.*;
-import org.gridgain.grid.kernal.processors.affinity.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
index 80e9a24..642d954 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.util.future;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.lang.*;
-import org.gridgain.grid.kernal.processors.closure.*;
+import org.apache.ignite.internal.processors.closure.*;
 import org.apache.ignite.internal.util.io.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.gridgain.testframework.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/cache/affinity/fair/GridCachePartitionFairAffinitySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/cache/affinity/fair/GridCachePartitionFairAffinitySelfTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/cache/affinity/fair/GridCachePartitionFairAffinitySelfTest.java
index d37f57f..eea4930 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/cache/affinity/fair/GridCachePartitionFairAffinitySelfTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/cache/affinity/fair/GridCachePartitionFairAffinitySelfTest.java
@@ -21,7 +21,7 @@ import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cache.affinity.fair.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
-import org.gridgain.grid.kernal.processors.affinity.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.gridgain.testframework.*;
 import org.gridgain.testframework.junits.common.*;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java
deleted file mode 100644
index ee156fa..0000000
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.affinity;
-
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.gridgain.testframework.*;
-import org.gridgain.testframework.junits.common.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.cache.GridCacheMode.*;
-
-/**
- * Tests for {@link GridAffinityProcessor}.
- */
-@GridCommonTest(group = "Affinity Processor")
-public abstract class GridAffinityProcessorAbstractSelfTest extends 
GridCommonAbstractTest {
-    /** Number of grids started for tests. Should not be less than 2. */
-    private static final int NODES_CNT = 3;
-
-    /** Cache name. */
-    private static final String CACHE_NAME = "cache";
-
-    /** IP finder. */
-    private static final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
-
-    /** Flag to start grid with cache. */
-    private boolean withCache;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
-        discoSpi.setIpFinder(ipFinder);
-
-        cfg.setDiscoverySpi(discoSpi);
-
-        if (withCache) {
-            CacheConfiguration cacheCfg = defaultCacheConfiguration();
-
-            cacheCfg.setName(CACHE_NAME);
-            cacheCfg.setCacheMode(PARTITIONED);
-            cacheCfg.setBackups(1);
-            cacheCfg.setAffinity(affinityFunction());
-
-            cfg.setCacheConfiguration(cacheCfg);
-        }
-
-        return cfg;
-    }
-
-    /**
-     * Creates affinity function for test.
-     *
-     * @return Affinity function.
-     */
-    protected abstract GridCacheAffinityFunction affinityFunction();
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"ConstantConditions"})
-    @Override protected void beforeTestsStarted() throws Exception {
-        assert NODES_CNT >= 1;
-
-        withCache = false;
-
-        for (int i = 0; i < NODES_CNT; i++)
-            startGrid(i);
-
-        withCache = true;
-
-        for (int i = NODES_CNT; i < 2 * NODES_CNT; i++)
-            startGrid(i);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-    }
-
-    /**
-     * Test affinity functions caching and clean up.
-     *
-     * @throws Exception In case of any exception.
-     */
-    @SuppressWarnings("AssertEqualsBetweenInconvertibleTypes")
-    public void testAffinityProcessor() throws Exception {
-        Random rnd = new Random();
-
-        final GridKernal grid1 = (GridKernal)grid(rnd.nextInt(NODES_CNT)); // 
With cache.
-        GridKernal grid2 = (GridKernal)grid(NODES_CNT + 
rnd.nextInt(NODES_CNT)); // Without cache.
-
-        assertEquals(NODES_CNT * 2, grid1.nodes().size());
-        assertEquals(NODES_CNT * 2, grid2.nodes().size());
-
-        GridTestUtils.assertThrows(log, new Callable<Void>() {
-            @Override public Void call() throws Exception {
-                grid1.cache(CACHE_NAME);
-
-                return null;
-            }
-        }, IllegalArgumentException.class, null);
-
-        GridCache<Integer, Integer> cache = grid2.cache(CACHE_NAME);
-
-        assertNotNull(cache);
-
-        GridAffinityProcessor affPrc1 = grid1.context().affinity();
-        GridAffinityProcessor affPrc2 = grid2.context().affinity();
-
-        // Create keys collection.
-        Collection<Integer> keys = new ArrayList<>(1000);
-
-        for (int i = 0; i < 1000; i++)
-            keys.add(i);
-
-        //
-        // Validate affinity functions collection updated on first call.
-        //
-
-        Map<ClusterNode, Collection<Integer>> node1Map = 
affPrc1.mapKeysToNodes(CACHE_NAME, keys);
-        Map<ClusterNode, Collection<Integer>> node2Map = 
affPrc2.mapKeysToNodes(CACHE_NAME, keys);
-        Map<ClusterNode, Collection<Integer>> cacheMap = 
cache.affinity().mapKeysToNodes(keys);
-
-        assertEquals(cacheMap.size(), node1Map.size());
-        assertEquals(cacheMap.size(), node2Map.size());
-
-        for (Map.Entry<ClusterNode, Collection<Integer>> entry : 
cacheMap.entrySet()) {
-            ClusterNode node = entry.getKey();
-
-            Collection<Integer> mappedKeys = entry.getValue();
-
-            Collection<Integer> mapped1 = node1Map.get(node);
-            Collection<Integer> mapped2 = node2Map.get(node);
-
-            assertTrue(mappedKeys.containsAll(mapped1) && 
mapped1.containsAll(mappedKeys));
-            assertTrue(mappedKeys.containsAll(mapped2) && 
mapped2.containsAll(mappedKeys));
-        }
-    }
-
-    /**
-     * Test performance of affinity processor.
-     *
-     * @throws Exception In case of any exception.
-     */
-    public void testPerformance() throws Exception {
-        GridKernal grid = (GridKernal)grid(0);
-        GridAffinityProcessor aff = grid.context().affinity();
-
-        int keysSize = 1000000;
-
-        Collection<Integer> keys = new ArrayList<>(keysSize);
-
-        for (int i = 0; i < keysSize; i++)
-            keys.add(i);
-
-        long start = System.currentTimeMillis();
-
-        int iterations = 10000000;
-
-        for (int i = 0; i < iterations; i++)
-            aff.mapKeyToNode(keys);
-
-        long diff = System.currentTimeMillis() - start;
-
-        info(">>> Map " + keysSize + " keys to " + grid.nodes().size() + " 
nodes " + iterations + " times in " + diff + "ms.");
-
-        assertTrue(diff < 25000);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorConsistentHashSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorConsistentHashSelfTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorConsistentHashSelfTest.java
deleted file mode 100644
index c44fc82..0000000
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorConsistentHashSelfTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.affinity;
-
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.cache.affinity.consistenthash.*;
-
-/**
- * Tests consistent hash affinity function.
- */
-public class GridAffinityProcessorConsistentHashSelfTest extends 
GridAffinityProcessorAbstractSelfTest {
-    /** {@inheritDoc} */
-    @Override protected GridCacheAffinityFunction affinityFunction() {
-        return new GridCacheConsistentHashAffinityFunction();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorRendezvousSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorRendezvousSelfTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorRendezvousSelfTest.java
deleted file mode 100644
index 625d27c..0000000
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorRendezvousSelfTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.affinity;
-
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.cache.affinity.rendezvous.*;
-
-/**
- * Tests affinity processor with rendezvous affinity function.
- */
-public class GridAffinityProcessorRendezvousSelfTest extends 
GridAffinityProcessorAbstractSelfTest {
-    /** {@inheritDoc} */
-    @Override protected GridCacheAffinityFunction affinityFunction() {
-        return new GridCacheRendezvousAffinityFunction();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAffinityApiSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAffinityApiSelfTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAffinityApiSelfTest.java
index cd78fb4..5d79125 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAffinityApiSelfTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAffinityApiSelfTest.java
@@ -21,7 +21,7 @@ import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
-import org.gridgain.grid.kernal.processors.affinity.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.util.typedef.*;
 
 import java.util.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
index 4240649..8122226 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
@@ -24,7 +24,7 @@ import org.apache.ignite.cache.datastructures.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.lang.*;
-import org.gridgain.grid.kernal.processors.affinity.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 4de364c..a3505be 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -31,7 +31,7 @@ import org.apache.ignite.marshaller.optimized.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.gridgain.grid.kernal.processors.continuous.*;
+import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.gridgain.testframework.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessorRemoteTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessorRemoteTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessorRemoteTest.java
deleted file mode 100644
index a30d7aa..0000000
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessorRemoteTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.closure;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.gridgain.testframework.junits.common.*;
-import java.util.*;
-
-/**
- * Tests execution of anonymous closures on remote nodes.
- */
-@GridCommonTest(group = "Closure Processor")
-public class GridClosureProcessorRemoteTest extends GridCommonAbstractTest {
-    /**
-     *
-     */
-    public GridClosureProcessorRemoteTest() {
-        super(true); // Start grid.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getTestGridName() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration() throws 
Exception {
-        IgniteConfiguration cfg = new IgniteConfiguration();
-
-        cfg.setDiscoverySpi(new TcpDiscoverySpi());
-
-        return cfg;
-    }
-
-    /**
-     * @throws Exception Thrown in case of failure.
-     */
-    public void testAnonymousBroadcast() throws Exception {
-        Ignite g = grid();
-
-        assert g.cluster().nodes().size() >= 2;
-
-        g.compute().run(new CA() {
-            @Override public void apply() {
-                System.out.println("BROADCASTING....");
-            }
-        });
-
-        Thread.sleep(2000);
-    }
-
-    /**
-     * @throws Exception Thrown in case of failure.
-     */
-    public void testAnonymousUnicast() throws Exception {
-        Ignite g = grid();
-
-        assert g.cluster().nodes().size() >= 2;
-
-        ClusterNode rmt = F.first(g.cluster().forRemotes().nodes());
-
-        compute(g.cluster().forNode(rmt)).run(new CA() {
-            @Override public void apply() {
-                System.out.println("UNICASTING....");
-            }
-        });
-
-        Thread.sleep(2000);
-    }
-
-    /**
-     *
-     * @throws Exception Thrown in case of failure.
-     */
-    public void testAnonymousUnicastRequest() throws Exception {
-        Ignite g = grid();
-
-        assert g.cluster().nodes().size() >= 2;
-
-        ClusterNode rmt = F.first(g.cluster().forRemotes().nodes());
-        final ClusterNode loc = g.cluster().localNode();
-
-        compute(g.cluster().forNode(rmt)).run(new CA() {
-            @Override public void apply() {
-                message(grid().forNode(loc)).localListen(new 
IgniteBiPredicate<UUID, String>() {
-                    @Override public boolean apply(UUID uuid, String s) {
-                        System.out.println("Received test message [nodeId: " + 
uuid + ", s=" + s + ']');
-
-                        return false;
-                    }
-                }, null);
-            }
-        });
-
-        message(g.cluster().forNode(rmt)).send(null, "TESTING...");
-
-        Thread.sleep(2000);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessorSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessorSelfTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessorSelfTest.java
deleted file mode 100644
index 8f7ebc7..0000000
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessorSelfTest.java
+++ /dev/null
@@ -1,541 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.closure;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.optimized.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.gridgain.testframework.*;
-import org.gridgain.testframework.junits.common.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Tests for {@link GridClosureProcessor}.
- */
-@GridCommonTest(group = "Closure Processor")
-public class GridClosureProcessorSelfTest extends GridCommonAbstractTest {
-    /** Number of grids started for tests. Should not be less than 2. */
-    private static final int NODES_CNT = 2;
-
-    /** Job sleep duration in order to initiate timeout exception. */
-    private static final long JOB_SLEEP = 200;
-
-    /** Timeout used in timed tests. */
-    private static final long JOB_TIMEOUT = 100;
-
-    /** IP finder. */
-    private final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
-        discoSpi.setIpFinder(ipFinder);
-
-        cfg.setDiscoverySpi(discoSpi);
-
-        cfg.setCacheConfiguration();
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"ConstantConditions"})
-    @Override protected void beforeTestsStarted() throws Exception {
-        assert NODES_CNT >= 2;
-
-        startGrids(NODES_CNT);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        execCntr.set(0);
-    }
-
-    /** Execution counter for runnable and callable jobs. */
-    private static AtomicInteger execCntr = new AtomicInteger(0);
-
-    /**
-     * Test runnable job.
-     */
-    private static class TestRunnable implements IgniteRunnable {
-        /** */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
-        /** */
-        @IgniteLoggerResource
-        private IgniteLogger log;
-
-        /** @{inheritDoc} */
-        @Override public void run() {
-            log.info("Runnable job executed on node: " + 
ignite.cluster().localNode().id());
-
-            assert ignite != null;
-
-            execCntr.incrementAndGet();
-        }
-    }
-
-    /**
-     * Base class for test callables.
-     */
-    private abstract static class AbstractTestCallable implements 
IgniteCallable<Integer> {
-        /** */
-        @IgniteInstanceResource
-        protected Ignite ignite;
-
-        /** */
-        @IgniteLoggerResource
-        protected IgniteLogger log;
-    }
-
-    /**
-     * Test callable job.
-     */
-    private static class TestCallable extends AbstractTestCallable {
-        /** {@inheritDoc} */
-        @Override public Integer call() {
-            log.info("Callable job executed on node: " + 
ignite.cluster().localNode().id());
-
-            assert ignite != null;
-
-            return execCntr.incrementAndGet();
-        }
-    }
-
-    /**
-     * Test callable job which throws class not found exception.
-     */
-    private static class TestCallableError extends AbstractTestCallable 
implements Externalizable {
-        /**
-         *
-         */
-        public TestCallableError() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public Integer call() {
-            log.info("Callable job executed on node: " + 
ignite.cluster().localNode().id());
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            throw new ClassNotFoundException();
-        }
-    }
-
-    /**
-     * Test callable job which sleeps for some time. Is used in timeout tests.
-     */
-    private static class TestCallableTimeout extends AbstractTestCallable {
-        /** {@inheritDoc} */
-        @Override public Integer call() throws Exception {
-            Thread.sleep(JOB_SLEEP);
-
-            return null;
-        }
-    }
-
-    /**
-     * @param idx Node index.
-     * @param job Runnable job.
-     * @param p Optional node predicate.
-     * @return Future object.
-     * @throws IgniteCheckedException If failed.
-     */
-    private IgniteFuture<?> runAsync(int idx, Runnable job, @Nullable 
IgnitePredicate<ClusterNode> p)
-        throws IgniteCheckedException {
-        assert idx >= 0 && idx < NODES_CNT;
-        assert job != null;
-
-        execCntr.set(0);
-
-        IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : 
grid(idx).compute();
-
-        comp = comp.enableAsync();
-
-        comp.run(job);
-
-        return comp.future();
-    }
-
-    /**
-     * @param idx Node index.
-     * @param job Runnable job.
-     * @param p Optional node predicate.
-     * @return Future object.
-     * @throws IgniteCheckedException If failed.
-     */
-    private IgniteFuture<?> broadcast(int idx, Runnable job, @Nullable 
IgnitePredicate<ClusterNode> p)
-        throws IgniteCheckedException {
-        assert idx >= 0 && idx < NODES_CNT;
-        assert job != null;
-
-        execCntr.set(0);
-
-        ClusterGroup prj = grid(idx);
-
-        if (p != null)
-            prj = prj.forPredicate(p);
-
-        IgniteCompute comp = compute(prj).enableAsync();
-
-        comp.broadcast(job);
-
-        return comp.future();
-    }
-
-    /**
-     * @param idx Node index.
-     * @param jobs Runnable jobs.
-     * @param p Optional node predicate.
-     * @return Future object.
-     * @throws IgniteCheckedException If failed.
-     */
-    private IgniteFuture<?> runAsync(int idx, Collection<TestRunnable> jobs, 
@Nullable IgnitePredicate<ClusterNode> p)
-        throws IgniteCheckedException {
-        assert idx >= 0 && idx < NODES_CNT;
-        assert !F.isEmpty(jobs);
-
-        execCntr.set(0);
-
-        IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : 
grid(idx).compute();
-
-        comp = comp.enableAsync();
-
-        comp.run(jobs);
-
-        return comp.future();
-    }
-
-    /**
-     * @param idx Node index.
-     * @param job Callable job.
-     * @param p Optional node predicate.
-     * @return Future object.
-     * @throws IgniteCheckedException If failed.
-     */
-    private IgniteFuture<Integer> callAsync(int idx, Callable<Integer> job, 
@Nullable IgnitePredicate<ClusterNode> p)
-        throws IgniteCheckedException {
-        assert idx >= 0 && idx < NODES_CNT;
-        assert job != null;
-
-        execCntr.set(0);
-
-        IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : 
grid(idx).compute();
-
-        comp = comp.enableAsync();
-
-        comp.call(job);
-
-        return comp.future();
-    }
-
-    /**
-     * @param idx Node index.
-     * @param job Callable job.
-     * @param p Optional node predicate.
-     * @return Future object.
-     * @throws IgniteCheckedException If failed.
-     */
-    private IgniteFuture<Collection<Integer>> broadcast(int idx, 
Callable<Integer> job,
-        @Nullable IgnitePredicate<ClusterNode> p) throws 
IgniteCheckedException {
-        assert idx >= 0 && idx < NODES_CNT;
-        assert job != null;
-
-        execCntr.set(0);
-
-        IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : 
grid(idx).compute();
-
-        comp = comp.enableAsync();
-
-        comp.broadcast(job);
-
-        return comp.future();
-    }
-
-    /**
-     * @param idx Node index.
-     * @param jobs Callable job.
-     * @param p Optional node predicate.
-     * @return Future object.
-     * @throws IgniteCheckedException If failed.
-     */
-    private IgniteFuture<Collection<Integer>> callAsync(int idx, 
Collection<TestCallable> jobs,
-        @Nullable IgnitePredicate<ClusterNode> p) throws 
IgniteCheckedException {
-        assert idx >= 0 && idx < NODES_CNT;
-        assert !F.isEmpty(jobs);
-
-        execCntr.set(0);
-
-        IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : 
grid(idx).compute();
-
-        comp = comp.enableAsync();
-
-        comp.call(jobs);
-
-        return comp.future();
-    }
-
-    /**
-     * @param idx Node index.
-     * @return Predicate.
-     */
-    private IgnitePredicate<ClusterNode> singleNodePredicate(final int idx) {
-        assert idx >= 0 && idx < NODES_CNT;
-
-        return new IgnitePredicate<ClusterNode>() {
-            @Override public boolean apply(ClusterNode e) { return 
grid(idx).localNode().id().equals(e.id()); }
-        };
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testRunAsyncSingle() throws Exception {
-        Runnable job = new TestRunnable();
-
-        IgniteFuture<?> fut = broadcast(0, job, null);
-
-        assert fut.get() == null;
-
-        assertEquals(NODES_CNT, execCntr.getAndSet(0));
-
-        fut = broadcast(0, job, singleNodePredicate(0));
-
-        assert fut.get() == null;
-
-        assertEquals(1, execCntr.get());
-
-        fut = runAsync(0, job, null);
-
-        assert fut.get() == null : "Execution result must be null.";
-
-        assert execCntr.get() == 1 :
-            "Execution counter must be equal to 1, actual: " + execCntr.get();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testRunAsyncMultiple() throws Exception {
-        Collection<TestRunnable> jobs = F.asList(new TestRunnable(), new 
TestRunnable());
-
-        IgniteFuture<?> fut = runAsync(0, jobs, null);
-
-        assert fut.get() == null : "Execution result must be null.";
-
-        assert execCntr.get() == jobs.size() :
-            "Execution counter must be equal to " + jobs.size() + ", actual: " 
+ execCntr.get();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testCallAsyncSingle() throws Exception {
-        Callable<Integer> job = new TestCallable();
-
-        IgniteFuture<Collection<Integer>> fut1 = broadcast(0, job, null);
-
-        assert fut1.get() != null;
-
-        assertEquals(NODES_CNT, execCntr.getAndSet(0));
-
-        fut1 = broadcast(0, job, singleNodePredicate(0));
-
-        // We left one node so we can get definite result.
-        assertEquals(Integer.valueOf(1), F.first(fut1.get()));
-
-        assertEquals(1, execCntr.get());
-
-        IgniteFuture<Integer> fut2 = callAsync(0, job, null);
-
-        assert fut2.get() == 1 :
-            "Execution result must be equal to 1, actual: " + fut2.get();
-
-        assert execCntr.get() == 1 :
-            "Execution counter must be equal to 1, actual: " + execCntr.get();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testCallAsyncErrorNoFailover() throws Exception {
-        IgniteCompute comp = 
compute(grid(0).forPredicate(F.notEqualTo(grid(0).localNode()))).enableAsync();
-
-        comp.withNoFailover().call(new TestCallableError());
-
-        IgniteFuture<Integer> fut = comp.future();
-
-        try {
-            fut.get();
-
-            assert false : "Exception should have been thrown.";
-        }
-        catch (IgniteCheckedException e) {
-            info("Caught expected exception: " + e);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testWithName() throws Exception {
-        grid(0).compute().withName("TestTaskName").call(new TestCallable());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testWithTimeout() throws Exception {
-        Collection<TestCallableTimeout> jobs = F.asList(new 
TestCallableTimeout());
-
-        boolean timedOut = false;
-
-        try {
-            // Ensure that we will get timeout exception.
-            grid(0).compute().withTimeout(JOB_TIMEOUT).call(jobs);
-        }
-        catch (ComputeTaskTimeoutException ignore) {
-            timedOut = true;
-        }
-
-        assert timedOut : "Task has not timed out.";
-
-        timedOut = false;
-
-        try {
-            // Previous task invocation cleared the timeout.
-            grid(0).compute().call(jobs);
-        }
-        catch (ComputeTaskTimeoutException ignore) {
-            timedOut = true;
-        }
-
-        assert !timedOut : "Subsequently called task has timed out.";
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testCallAsyncMultiple() throws Exception {
-        Collection<TestCallable> jobs = F.asList(new TestCallable(), new 
TestCallable());
-
-        IgniteFuture<Collection<Integer>> fut = callAsync(0, jobs, null);
-
-        Collection<Integer> results = fut.get();
-
-        assert !results.isEmpty() : "Collection of results is empty.";
-
-        assert results.size() == jobs.size() :
-            "Collection of results must be of size: " + jobs.size() + ".";
-
-        for (int i = 1; i <= jobs.size(); i++)
-            assert results.contains(i) : "Collection of results does not 
contain value: " + i;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testReduceAsync() throws Exception {
-        Collection<TestCallable> jobs = F.asList(new TestCallable(), new 
TestCallable());
-
-        IgniteCompute comp = grid(0).compute().enableAsync();
-
-        comp.call(jobs, F.sumIntReducer());
-
-        IgniteFuture<Integer> fut = comp.future();
-
-        // Sum of arithmetic progression.
-        int exp = (1 + jobs.size()) * jobs.size() / 2;
-
-        assert fut.get() == exp :
-            "Execution result must be equal to " + exp + ", actual: " + 
fut.get();
-
-        assert execCntr.get() == jobs.size() :
-            "Execution counter must be equal to " + jobs.size() + ", actual: " 
+ execCntr.get();
-
-        execCntr.set(0);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testReducerError() throws Exception {
-        final Ignite g = grid(0);
-
-        final Collection<Callable<Integer>> jobs = new ArrayList<>();
-
-        for (int i = 0; i < g.cluster().nodes().size(); i++) {
-            jobs.add(new IgniteCallable<Integer>() {
-                @Override public Integer call() throws Exception {
-                    throw new RuntimeException("Test exception.");
-                }
-            });
-        }
-
-        GridTestUtils.assertThrows(log, new Callable<Void>() {
-            @Override public Void call() throws Exception {
-                g.compute().call(jobs, new IgniteReducer<Integer, Object>() {
-                    @Override public boolean collect(@Nullable Integer e) {
-                        fail("Expects failed jobs never call 'collect' 
method.");
-
-                        return true;
-                    }
-
-                    @Override public Object reduce() {
-                        return null;
-                    }
-                });
-
-                return null;
-            }
-        }, IgniteCheckedException.class, null);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/package.html
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/package.html
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/package.html
deleted file mode 100644
index 1f85ff2..0000000
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/package.html
+++ /dev/null
@@ -1,23 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 
"http://www.w3.org/TR/html4/loose.dtd";>
-<html>
-<body>
-    <!-- Package description. -->
-    Contains internal tests or test related classes and interfaces.
-</body>
-</html>

Reply via email to