ignite-2615 If swap is not enabled need pass value evicted from offheap to 
query manager
(cherry picked from commit a6b3dedd381a71fec8dd9dcf944be2626336cc22)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/accb0e9e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/accb0e9e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/accb0e9e

Branch: refs/heads/ignite-testing-discovery
Commit: accb0e9efa9b630801912784b29eca64622152ef
Parents: d6a143c
Author: sboikov <[email protected]>
Authored: Mon Apr 18 12:09:53 2016 +0300
Committer: sboikov <[email protected]>
Committed: Mon Apr 18 12:10:46 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryEx.java      |   7 +-
 .../processors/cache/GridCacheMapEntry.java     |  12 +-
 .../processors/cache/GridCacheProcessor.java    |   4 +-
 .../processors/cache/GridCacheSwapManager.java  |  43 +-
 .../processors/cache/GridCacheTestEntryEx.java  |   2 +-
 .../CacheQueryOffheapEvictDataLostTest.java     | 138 +++++
 .../CacheRandomOperationsMultithreadedTest.java | 507 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite2.java         |   3 +
 .../IgniteCacheWithIndexingTestSuite.java       |   2 +
 9 files changed, 695 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index a7880e3..dc8e08c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -931,7 +931,10 @@ public interface GridCacheEntryEx {
     public void updateTtl(@Nullable GridCacheVersion ver, long ttl) throws 
GridCacheEntryRemovedException;
 
     /**
-     * Tries to do offheap -> swap eviction.
+     * Called when entry should be evicted from offheap.
+     * <p>
+     * If swap is enabled tries to do offheap -> swap eviction, otherwise 
evicted value should
+     * be passed to query manager.
      *
      * @param entry Serialized swap entry.
      * @param evictVer Version when entry was selected for eviction.
@@ -940,7 +943,7 @@ public interface GridCacheEntryEx {
      * @throws GridCacheEntryRemovedException If entry was removed.
      * @return {@code True} if entry was obsoleted and written to swap.
      */
-    public boolean offheapSwapEvict(byte[] entry, GridCacheVersion evictVer, 
GridCacheVersion obsoleteVer)
+    public boolean onOffheapEvict(byte[] entry, GridCacheVersion evictVer, 
GridCacheVersion obsoleteVer)
         throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index f31a992..435d337 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -431,9 +431,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
     }
 
     /** {@inheritDoc} */
-    @Override public boolean offheapSwapEvict(byte[] entry, GridCacheVersion 
evictVer, GridCacheVersion obsoleteVer)
+    @Override public boolean onOffheapEvict(byte[] entry, GridCacheVersion 
evictVer, GridCacheVersion obsoleteVer)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
-        assert cctx.swap().swapEnabled() && cctx.swap().offHeapEnabled() : 
this;
+        assert cctx.swap().offHeapEnabled() && (cctx.swap().swapEnabled() || 
cctx.queries().enabled()) : this;
 
         boolean obsolete;
 
@@ -448,12 +448,18 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             if (mvcc != null && !mvcc.isEmpty(obsoleteVer))
                 return false;
 
-            if (cctx.swap().offheapSwapEvict(key, entry, partition(), 
evictVer)) {
+            if (cctx.swap().onOffheapEvict(key, entry, partition(), evictVer)) 
{
                 assert !hasValueUnlocked() : this;
 
                 obsolete = markObsolete0(obsoleteVer, false, null);
 
                 assert obsolete : this;
+
+                if (!cctx.swap().swapEnabled()) {
+                    CacheObject val = 
cctx.swap().unmarshalSwapEntryValue(entry);
+
+                    clearIndex(val);
+                }
             }
             else
                 obsolete = false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index a8f3a4a..7c456b7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2846,9 +2846,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                 try {
                     KeyCacheObject key = cctx.toCacheKeyObject(keyBytes);
 
-                    GridCacheSwapEntry swapEntry = 
GridCacheSwapEntryImpl.unmarshal(valBytes, true);
-
-                    CacheObject val = swapEntry.value();
+                    CacheObject val = 
cctx.swap().unmarshalSwapEntryValue(valBytes);
 
                     assert val != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index a35bb3f..d50bf0b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -40,6 +40,7 @@ import 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionAware;
 import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
@@ -106,8 +107,8 @@ public class GridCacheSwapManager extends 
GridCacheManagerAdapter {
     /** Soft iterator set. */
     private final Collection<GridWeakIterator<Map.Entry>> itSet = new 
GridConcurrentHashSet<>();
 
-    /** {@code True} if offheap to swap eviction is possible. */
-    private boolean offheapToSwapEvicts;
+    /** {@code True} if need process evictions from offheap. */
+    private boolean unwindOffheapEvicts;
 
     /** Values to be evicted from offheap to swap. */
     private ThreadLocal<Collection<IgniteBiTuple<byte[], byte[]>>> 
offheapEvicts = new ThreadLocal<>();
@@ -141,7 +142,7 @@ public class GridCacheSwapManager extends 
GridCacheManagerAdapter {
      *
      */
     public void unwindOffheapEvicts() {
-        if (!offheapToSwapEvicts)
+        if (!unwindOffheapEvicts)
             return;
 
         Collection<IgniteBiTuple<byte[], byte[]>> evicts = offheapEvicts.get();
@@ -162,7 +163,7 @@ public class GridCacheSwapManager extends 
GridCacheManagerAdapter {
                         GridCacheEntryEx entry = cctx.cache().entryEx(key);
 
                         try {
-                            if (entry.offheapSwapEvict(vb, evictVer, 
obsoleteVer))
+                            if (entry.onOffheapEvict(vb, evictVer, 
obsoleteVer))
                                 cctx.cache().removeEntry(entry);
 
                             break;
@@ -199,12 +200,12 @@ public class GridCacheSwapManager extends 
GridCacheManagerAdapter {
 
         GridOffHeapEvictListener lsnr;
 
-        if (swapEnabled) {
-            offheapToSwapEvicts = true;
+        if (swapEnabled || GridQueryProcessor.isEnabled(cctx.config())) {
+            unwindOffheapEvicts = true;
 
             lsnr = new GridOffHeapEvictListener() {
                 @Override public void onEvict(int part, int hash, byte[] kb, 
byte[] vb) {
-                    assert offheapToSwapEvicts;
+                    assert unwindOffheapEvicts;
 
                     onOffheapEvict();
 
@@ -1100,7 +1101,7 @@ public class GridCacheSwapManager extends 
GridCacheManagerAdapter {
      * @return {@code True} if removed.
      * @throws IgniteCheckedException If failed.
      */
-    boolean offheapSwapEvict(final KeyCacheObject key, byte[] entry, int part, 
final GridCacheVersion ver)
+    boolean onOffheapEvict(final KeyCacheObject key, byte[] entry, int part, 
final GridCacheVersion ver)
         throws IgniteCheckedException {
         assert offheapEnabled;
 
@@ -1120,13 +1121,14 @@ public class GridCacheSwapManager extends 
GridCacheManagerAdapter {
             Collection<GridCacheSwapListener> lsnrs = offheapLsnrs.get(part);
 
             if (lsnrs != null) {
-                GridCacheSwapEntry e = 
swapEntry(GridCacheSwapEntryImpl.unmarshal(entry, false));
+                GridCacheSwapEntry e = swapEntry(unmarshalSwapEntry(entry, 
false));
 
                 for (GridCacheSwapListener lsnr : lsnrs)
                     lsnr.onEntryUnswapped(part, key, e);
             }
 
-            cctx.swap().writeToSwap(part, key, entry);
+            if (swapEnabled)
+                cctx.swap().writeToSwap(part, key, entry);
         }
 
         return rmv;
@@ -2166,6 +2168,19 @@ public class GridCacheSwapManager extends 
GridCacheManagerAdapter {
 
     /**
      * @param bytes Bytes to unmarshal.
+     * @return Unmarshalled values.
+     * @throws IgniteCheckedException If failed.
+     */
+    public CacheObject unmarshalSwapEntryValue(byte[] bytes) throws 
IgniteCheckedException {
+        GridCacheSwapEntry swapEntry = 
swapEntry(GridCacheSwapEntryImpl.unmarshal(bytes, true));
+
+        assert swapEntry != null && swapEntry.value() != null : swapEntry;
+
+        return swapEntry.value();
+    }
+
+    /**
+     * @param bytes Bytes to unmarshal.
      * @param valOnly If {@code true} unmarshalls only value.
      * @return Unmarshalled entry.
      */
@@ -2216,9 +2231,9 @@ public class GridCacheSwapManager extends 
GridCacheManagerAdapter {
         @Override protected Map.Entry<byte[], GridCacheSwapEntry> onNext() 
throws IgniteCheckedException {
             Map.Entry<byte[], byte[]> e = iter.nextX();
 
-            GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue(), 
false);
+            GridCacheSwapEntry unmarshalled = 
swapEntry(unmarshalSwapEntry(e.getValue(), false));
 
-            return F.t(e.getKey(), swapEntry(unmarshalled));
+            return F.t(e.getKey(), unmarshalled);
         }
 
         /** {@inheritDoc} */
@@ -2524,9 +2539,9 @@ public class GridCacheSwapManager extends 
GridCacheManagerAdapter {
         /** {@inheritDoc} */
         @Override public V getValue() {
             try {
-                GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), 
false);
+                GridCacheSwapEntry e = 
swapEntry(unmarshalSwapEntry(entry.getValue(), false));
 
-                swapEntry(e);
+                assert e != null;
 
                 return e.value().value(cctx.cacheObjectContext(), false);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 1bede92..4455b46 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -853,7 +853,7 @@ public class GridCacheTestEntryEx extends 
GridMetadataAwareAdapter implements Gr
     }
 
     /** {@inheritDoc} */
-    @Override public boolean offheapSwapEvict(byte[] vb, GridCacheVersion 
evictVer, GridCacheVersion obsoleteVer)
+    @Override public boolean onOffheapEvict(byte[] vb, GridCacheVersion 
evictVer, GridCacheVersion obsoleteVer)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryOffheapEvictDataLostTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryOffheapEvictDataLostTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryOffheapEvictDataLostTest.java
new file mode 100644
index 0000000..26e8300
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryOffheapEvictDataLostTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+
+/**
+ *
+ */
+public class CacheQueryOffheapEvictDataLostTest extends GridCommonAbstractTest 
{
+    /** */
+    private static final int KEYS = 100_000;
+
+    /**
+     *
+     */
+    public CacheQueryOffheapEvictDataLostTest() {
+        super(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration() throws 
Exception {
+        IgniteConfiguration cfg = super.getConfiguration();
+
+        CacheConfiguration<Object, Object> ccfg1 = new CacheConfiguration<>();
+
+        ccfg1.setName("cache-1");
+        ccfg1.setMemoryMode(OFFHEAP_TIERED);
+        ccfg1.setOffHeapMaxMemory(1024);
+        ccfg1.setIndexedTypes(Integer.class, TestData.class);
+        ccfg1.setSwapEnabled(false);
+
+        CacheConfiguration<Object, Object> ccfg2 = new CacheConfiguration<>();
+
+        ccfg2.setName("cache-2");
+        ccfg2.setMemoryMode(ONHEAP_TIERED);
+        ccfg2.setEvictionPolicy(new LruEvictionPolicy(10));
+        ccfg2.setOffHeapMaxMemory(1024);
+        ccfg2.setIndexedTypes(Integer.class, TestData.class);
+        ccfg2.setSwapEnabled(false);
+
+        cfg.setCacheConfiguration(ccfg1, ccfg2);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryDataLost() throws Exception {
+        final long stopTime = U.currentTimeMillis() + 30_000;
+
+        GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+            void putGet(IgniteCache<Object, Object> cache) {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                for (int i = 0; i < KEYS; i++) {
+                    cache.put(rnd.nextInt(KEYS), new TestData(i));
+
+                    cache.get(rnd.nextInt(KEYS));
+                }
+            }
+
+            void query(IgniteCache<Object, Object> cache) {
+                SqlQuery<Object, Object> qry1 = new SqlQuery<>(TestData.class, 
"_key > ?");
+                qry1.setArgs(KEYS / 2);
+
+                cache.query(qry1).getAll();
+
+                SqlQuery<Object, Object> qry2 = new SqlQuery<>(TestData.class, 
"idxVal > ?");
+                qry2.setArgs(KEYS / 2);
+
+                cache.query(qry2).getAll();
+            }
+
+            @Override public void apply(Integer idx) {
+                IgniteCache<Object, Object> cache1 = grid().cache("cache-1");
+                IgniteCache<Object, Object> cache2 = grid().cache("cache-2");
+
+                while (U.currentTimeMillis() < stopTime) {
+                    if (idx == 0) {
+                        putGet(cache1);
+                        putGet(cache2);
+                    }
+                    else {
+                        query(cache1);
+                        query(cache2);
+                    }
+                }
+            }
+        }, 10, "test-thread");
+    }
+
+    /**
+     *
+     */
+    static class TestData implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        private int idxVal;
+
+        /**
+         * @param idxVal Value.
+         */
+        public TestData(int idxVal) {
+            this.idxVal = idxVal;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRandomOperationsMultithreadedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRandomOperationsMultithreadedTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRandomOperationsMultithreadedTest.java
new file mode 100644
index 0000000..dc0175d
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRandomOperationsMultithreadedTest.java
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.eviction.EvictionPolicy;
+import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheRandomOperationsMultithreadedTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int KEYS = 1000;
+
+    /** */
+    private static final int NODES = 4;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
+
+        super.beforeTestsStarted();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapTiered() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED,
+            ATOMIC,
+            OFFHEAP_TIERED,
+            null,
+            false);
+
+        randomOperations(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapTieredIndexing() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED,
+            ATOMIC,
+            OFFHEAP_TIERED,
+            null,
+            true);
+
+        randomOperations(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapEviction() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED,
+            ATOMIC,
+            ONHEAP_TIERED,
+            new LruEvictionPolicy<>(10),
+            false);
+
+        randomOperations(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapEvictionIndexing() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED,
+            ATOMIC,
+            ONHEAP_TIERED,
+            new LruEvictionPolicy<>(10),
+            true);
+
+        randomOperations(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapTiered() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED,
+            TRANSACTIONAL,
+            OFFHEAP_TIERED,
+            null,
+            false);
+
+        randomOperations(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapTieredIndexing() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED,
+            TRANSACTIONAL,
+            OFFHEAP_TIERED,
+            null,
+            true);
+
+        randomOperations(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapEviction() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            new LruEvictionPolicy<>(10),
+            false);
+
+        randomOperations(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapEvictionIndexing() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            new LruEvictionPolicy<>(10),
+            true);
+
+        randomOperations(ccfg);
+    }
+
+    /**
+     * @param ccfg CacheConfiguration.
+     * @throws Exception If failed.
+     */
+    private void randomOperations(final CacheConfiguration<Object, Object> 
ccfg) throws Exception {
+        ignite(0).createCache(ccfg);
+
+        try {
+            final long stopTime = U.currentTimeMillis() + 30_000;
+
+            final boolean indexing = !F.isEmpty(ccfg.getIndexedTypes()) ||
+                !F.isEmpty(ccfg.getQueryEntities());
+
+            GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+                @Override public void apply(Integer idx) {
+                    Ignite ignite = ignite(idx % NODES);
+
+                    IgniteCache<Object, Object> cache = 
ignite.cache(ccfg.getName());
+
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    while (U.currentTimeMillis() < stopTime)
+                        randomOperation(rnd, ignite, cache, indexing);
+                }
+            }, 1, "test-thread");
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @param ignite Node.
+     * @param cache Cache.
+     * @param indexing Indexing flag.
+     */
+    private void randomOperation(ThreadLocalRandom rnd,
+        Ignite ignite,
+        IgniteCache<Object, Object> cache,
+        boolean indexing) {
+        int r0 = rnd.nextInt(100);
+
+        if (r0 == 0)
+            cache.clear();
+        else if (r0 == 1)
+            cache.size();
+
+        switch (rnd.nextInt(14)) {
+            case 0: {
+                cache.put(key(rnd), value(rnd));
+
+                break;
+            }
+
+            case 1: {
+                cache.getAndPut(key(rnd), value(rnd));
+
+                break;
+            }
+
+            case 2: {
+                cache.get(key(rnd));
+
+                break;
+            }
+
+            case 3: {
+                cache.remove(key(rnd));
+
+                break;
+            }
+
+            case 4: {
+                cache.getAndRemove(key(rnd));
+
+                break;
+            }
+
+            case 5: {
+                Map<Object, Object> map = new TreeMap<>();
+
+                for (int i = 0; i < 50; i++)
+                    map.put(key(rnd), value(rnd));
+
+                cache.putAll(map);
+
+                break;
+            }
+
+            case 6: {
+                cache.getAll(keys(50, rnd));
+
+                break;
+            }
+
+            case 7: {
+                cache.removeAll(keys(50, rnd));
+
+                break;
+            }
+
+            case 8: {
+                cache.putIfAbsent(key(rnd), value(rnd));
+
+                break;
+            }
+
+            case 9: {
+                cache.getAndPutIfAbsent(key(rnd), value(rnd));
+
+                break;
+            }
+
+            case 10: {
+                cache.replace(key(rnd), value(rnd));
+
+                break;
+            }
+
+            case 11: {
+                cache.getAndReplace(key(rnd), value(rnd));
+
+                break;
+            }
+
+            case 12: {
+                ScanQuery<Object, Object> qry = new ScanQuery<>();
+                qry.setFilter(new TestFilter());
+
+                List<Cache.Entry<Object, Object>> res = 
cache.query(qry).getAll();
+
+                assertTrue(res.size() >= 0);
+
+                break;
+            }
+
+            case 13: {
+                if (indexing) {
+                    SqlQuery<Object, Object> qry = new 
SqlQuery<>(TestData.class, "where val1 > ?");
+                    qry.setArgs(KEYS / 2);
+
+                    List<Cache.Entry<Object, Object>> res = 
cache.query(qry).getAll();
+
+                    assertTrue(res.size() >= 0);
+                }
+
+                break;
+            }
+
+            default:
+                fail();
+        }
+    }
+
+    /**
+     * @param cnt Number of keys.
+     * @param rnd Random generator.
+     * @return Keys.
+     */
+    private Set<Object> keys(int cnt, ThreadLocalRandom rnd) {
+        TreeSet<Object> keys = new TreeSet<>();
+
+        for (int i = 0; i < cnt; i++)
+            keys.add(key(rnd));
+
+        return keys;
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @return Key.
+     */
+    private Object key(ThreadLocalRandom rnd) {
+        return new TestKey(rnd.nextInt(KEYS), rnd);
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @return Value.
+     */
+    private Object value(ThreadLocalRandom rnd) {
+        return new TestData(rnd);
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param atomicityMode Cache atomicity mode.
+     * @param memoryMode Cache memory mode.
+     * @param evictionPlc Eviction policy.
+     * @param indexing Indexing flag.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheMode cacheMode,
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode,
+        @Nullable  EvictionPolicy<Object, Object> evictionPlc,
+        boolean indexing) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+        ccfg.setEvictionPolicy(evictionPlc);
+        ccfg.setOffHeapMaxMemory(0);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(1);
+
+        if (indexing)
+            ccfg.setIndexedTypes(TestKey.class, TestData.class);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    static class TestFilter implements IgniteBiPredicate<Object, Object> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(Object key, Object val) {
+            return ThreadLocalRandom.current().nextInt(10) == 0;
+        }
+    }
+
+    /**
+     *
+     */
+    static class TestKey implements Serializable, Comparable<TestKey> {
+        /** */
+        private int key;
+
+        /** */
+        private byte[] byteVal;
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(TestKey o) {
+            return Integer.compare(key, o.key);
+        }
+
+        /**
+         * @param key Key.
+         * @param rnd Random generator.
+         */
+        public TestKey(int key, ThreadLocalRandom rnd) {
+            this.key = key;
+            byteVal = new byte[rnd.nextInt(100)];
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey testKey = (TestKey)o;
+
+            return key == testKey.key;
+       }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key;
+        }
+    }
+
+    /**
+     *
+     */
+    static class TestData implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        private int val1;
+
+        /** */
+        private long val2;
+
+        /** */
+        @QuerySqlField(index = true)
+        private String val3;
+
+        /** */
+        private byte[] val4;
+
+        /**
+         * @param rnd Random generator.
+         */
+        public TestData(ThreadLocalRandom rnd) {
+            val1 = rnd.nextInt();
+            val2 = val1;
+            val3 = String.valueOf(val1);
+            val4 = new byte[rnd.nextInt(1024)];
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
index 64a182b..4b83f69 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
@@ -21,6 +21,7 @@ import junit.framework.TestSuite;
 import 
org.apache.ignite.internal.processors.cache.CacheLocalQueryMetricsSelfTest;
 import 
org.apache.ignite.internal.processors.cache.CachePartitionedQueryMetricsDistributedSelfTest;
 import 
org.apache.ignite.internal.processors.cache.CachePartitionedQueryMetricsLocalSelfTest;
+import 
org.apache.ignite.internal.processors.cache.CacheQueryOffheapEvictDataLostTest;
 import 
org.apache.ignite.internal.processors.cache.CacheReplicatedQueryMetricsDistributedSelfTest;
 import 
org.apache.ignite.internal.processors.cache.CacheReplicatedQueryMetricsLocalSelfTest;
 import 
org.apache.ignite.internal.processors.cache.CacheScanPartitionQueryFallbackSelfTest;
@@ -82,6 +83,8 @@ public class IgniteCacheQuerySelfTestSuite2 extends TestSuite 
{
 
         suite.addTestSuite(GridOrderedMessageCancelSelfTest.class);
 
+        suite.addTestSuite(CacheQueryOffheapEvictDataLostTest.class);
+
         // Ignite cache and H2 comparison.
         suite.addTestSuite(BaseH2CompareQueryTest.class);
         suite.addTestSuite(H2CompareBigQueryTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/accb0e9e/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index 550c69f..9ff7520 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.CacheConfigurationP2PTest;
 import org.apache.ignite.internal.processors.cache.CacheIndexStreamerTest;
+import 
org.apache.ignite.internal.processors.cache.CacheRandomOperationsMultithreadedTest;
 import 
org.apache.ignite.internal.processors.cache.GridCacheOffHeapAndSwapSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheOffHeapSelfTest;
 import 
org.apache.ignite.internal.processors.cache.GridCacheOffheapIndexEntryEvictTest;
@@ -70,6 +71,7 @@ public class IgniteCacheWithIndexingTestSuite extends 
TestSuite {
 
         
suite.addTestSuite(IgniteCacheConfigurationPrimitiveTypesSelfTest.class);
         suite.addTestSuite(IgniteClientReconnectQueriesTest.class);
+        suite.addTestSuite(CacheRandomOperationsMultithreadedTest.class);
 
         return suite;
     }

Reply via email to