http://git-wip-us.apache.org/repos/asf/ignite/blob/953b575f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java new file mode 100644 index 0000000..2ba7bb9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java @@ -0,0 +1,5851 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import javax.cache.Cache; +import javax.cache.CacheException; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.expiry.TouchedExpiryPolicy; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; +import javax.cache.processor.MutableEntry; +import junit.framework.AssertionFailedError; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.lang.GridAbsPredicateX; +import org.apache.ignite.internal.util.typedef.CIX1; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.IgniteCacheConfigVariationsAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.jetbrains.annotations.Nullable; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.LOCAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CachePeekMode.ALL; +import static org.apache.ignite.cache.CachePeekMode.BACKUP; +import static org.apache.ignite.cache.CachePeekMode.OFFHEAP; +import static org.apache.ignite.cache.CachePeekMode.ONHEAP; +import static org.apache.ignite.cache.CachePeekMode.PRIMARY; +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_LOCKED; +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_SWAPPED; +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_UNLOCKED; +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_UNSWAPPED; +import static org.apache.ignite.testframework.GridTestUtils.assertThrows; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; +import static org.apache.ignite.transactions.TransactionState.COMMITTED; + +/** + * Full API cache test. + */ +@SuppressWarnings({"TransientFieldInNonSerializableClass", "unchecked"}) +public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVariationsAbstractTest { + /** Test timeout */ + private static final long TEST_TIMEOUT = 60 * 1000; + + /** */ + public static final CacheEntryProcessor<String, Integer, String> ERR_PROCESSOR = + new CacheEntryProcessor<String, Integer, String>() { + /** */ + private static final long serialVersionUID = 0L; + + @Override public String process(MutableEntry<String, Integer> e, Object... args) { + throw new RuntimeException("Failed!"); + } + }; + + /** Increment processor for invoke operations. */ + public static final EntryProcessor<Object, Object, Object> INCR_PROCESSOR = new IncrementEntryProcessor(); + + /** Increment processor for invoke operations with IgniteEntryProcessor. */ + public static final CacheEntryProcessor<Object, Object, Object> INCR_IGNITE_PROCESSOR = + new CacheEntryProcessor<Object, Object, Object>() { + /** */ + private static final long serialVersionUID = 0L; + + @Override public Object process(MutableEntry<Object, Object> e, Object... args) { + return INCR_PROCESSOR.process(e, args); + } + }; + + /** Increment processor for invoke operations. */ + public static final EntryProcessor<Object, Object, Object> RMV_PROCESSOR = new RemoveEntryProcessor(); + + /** Increment processor for invoke operations with IgniteEntryProcessor. */ + public static final CacheEntryProcessor<Object, Object, Object> RMV_IGNITE_PROCESSOR = + new CacheEntryProcessor<Object, Object, Object>() { + /** */ + private static final long serialVersionUID = 0L; + + @Override public Object process(MutableEntry<Object, Object> e, Object... args) { + return RMV_PROCESSOR.process(e, args); + } + }; + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** + * @throws Exception In case of error. + */ + public void testSize() throws Exception { + assert jcache().localSize() == 0; + + int size = 10; + + final Map<String, Integer> map = new HashMap<>(); + + for (int i = 0; i < size; i++) + map.put("key" + i, i); + + // Put in primary nodes to avoid near readers which will prevent entry from being cleared. + Map<ClusterNode, Collection<String>> mapped = grid(0).<String>affinity(cacheName()).mapKeysToNodes(map.keySet()); + + for (int i = 0; i < gridCount(); i++) { + Collection<String> keys = mapped.get(grid(i).localNode()); + + if (!F.isEmpty(keys)) { + for (String key : keys) + jcache(i).put(key, map.get(key)); + } + } + + map.remove("key0"); + + mapped = grid(0).<String>affinity(cacheName()).mapKeysToNodes(map.keySet()); + + for (int i = 0; i < gridCount(); i++) { + // Will actually delete entry from map. + CU.invalidate(jcache(i), "key0"); + + assertNull("Failed check for grid: " + i, jcache(i).localPeek("key0", ONHEAP)); + + Collection<String> keysCol = mapped.get(grid(i).localNode()); + + assert jcache(i).localSize() != 0 || F.isEmpty(keysCol); + } + + for (int i = 0; i < gridCount(); i++) + executeOnLocalOrRemoteJvm(i, new CheckCacheSizeTask(map, cacheName())); + + for (int i = 0; i < gridCount(); i++) { + Collection<String> keysCol = mapped.get(grid(i).localNode()); + + assertEquals("Failed check for grid: " + i, !F.isEmpty(keysCol) ? keysCol.size() : 0, + jcache(i).localSize(PRIMARY)); + } + + int globalPrimarySize = map.size(); + + for (int i = 0; i < gridCount(); i++) + assertEquals(globalPrimarySize, jcache(i).size(PRIMARY)); + + int times = 1; + + if (cacheMode() == REPLICATED) + times = gridCount() - clientsCount(); + else if (cacheMode() == PARTITIONED) + times = Math.min(gridCount(), jcache().getConfiguration(CacheConfiguration.class).getBackups() + 1); + + int globalSize = globalPrimarySize * times; + + for (int i = 0; i < gridCount(); i++) + assertEquals(globalSize, jcache(i).size(ALL)); + } + + /** + * @throws Exception In case of error. + */ + public void testContainsKey() throws Exception { + jcache().put("testContainsKey", 1); + + checkContainsKey(true, "testContainsKey"); + checkContainsKey(false, "testContainsKeyWrongKey"); + } + + /** + * @throws Exception If failed. + */ + public void testContainsKeyTx() throws Exception { + if (!txEnabled()) + return; + + IgniteCache<String, Integer> cache = jcache(); + + IgniteTransactions txs = ignite(0).transactions(); + + for (int i = 0; i < 10; i++) { + String key = String.valueOf(i); + + try (Transaction tx = txs.txStart()) { + assertNull(key, cache.get(key)); + + assertFalse(cache.containsKey(key)); + + tx.commit(); + } + + try (Transaction tx = txs.txStart()) { + assertNull(key, cache.get(key)); + + cache.put(key, i); + + assertTrue(cache.containsKey(key)); + + tx.commit(); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testContainsKeysTx() throws Exception { + if (!txEnabled()) + return; + + IgniteCache<String, Integer> cache = jcache(); + + IgniteTransactions txs = ignite(0).transactions(); + + Set<String> keys = new HashSet<>(); + + for (int i = 0; i < 10; i++) { + String key = String.valueOf(i); + + keys.add(key); + } + + try (Transaction tx = txs.txStart()) { + for (String key : keys) + assertNull(key, cache.get(key)); + + assertFalse(cache.containsKeys(keys)); + + tx.commit(); + } + + try (Transaction tx = txs.txStart()) { + for (String key : keys) + assertNull(key, cache.get(key)); + + for (String key : keys) + cache.put(key, 0); + + assertTrue(cache.containsKeys(keys)); + + tx.commit(); + } + } + + /** + * @throws Exception If failed. + */ + public void testRemoveInExplicitLocks() throws Exception { + if (lockingEnabled()) { + IgniteCache<String, Integer> cache = jcache(); + + cache.put("a", 1); + + Lock lock = cache.lockAll(ImmutableSet.of("a", "b", "c", "d")); + + lock.lock(); + + try { + cache.remove("a"); + + // Make sure single-key operation did not remove lock. + cache.putAll(F.asMap("b", 2, "c", 3, "d", 4)); + } + finally { + lock.unlock(); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testRemoveAllSkipStore() throws Exception { + if (isMultiJvm()) + fail("https://issues.apache.org/jira/browse/IGNITE-1088"); + + if (!storeEnabled()) + return; + + IgniteCache<String, Integer> jcache = jcache(); + + jcache.putAll(F.asMap("1", 1, "2", 2, "3", 3)); + + jcache.withSkipStore().removeAll(); + + assertEquals((Integer)1, jcache.get("1")); + assertEquals((Integer)2, jcache.get("2")); + assertEquals((Integer)3, jcache.get("3")); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testAtomicOps() throws IgniteCheckedException { + IgniteCache<String, Integer> c = jcache(); + + final int cnt = 10; + + for (int i = 0; i < cnt; i++) + assertNull(c.getAndPutIfAbsent("k" + i, i)); + + for (int i = 0; i < cnt; i++) { + boolean wrong = i % 2 == 0; + + String key = "k" + i; + + boolean res = c.replace(key, wrong ? i + 1 : i, -1); + + assertEquals(wrong, !res); + } + + for (int i = 0; i < cnt; i++) { + boolean success = i % 2 != 0; + + String key = "k" + i; + + boolean res = c.remove(key, -1); + + assertTrue(success == res); + } + } + + /** + * @throws Exception In case of error. + */ + public void testGet() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() { + IgniteCache cache = jcache(); + + cache.put(key(1), value(1)); + cache.put(key(2), value(2)); + + assertEquals(value(1), cache.get(key(1))); + assertEquals(value(2), cache.get(key(2))); + // Wrong key. + assertNull(cache.get(key(3))); + } + }); + } + + /** + * @throws Exception In case of error. + */ + public void testGetAsync() throws Exception { + IgniteCache<String, Integer> cache = jcache(); + + cache.put("key1", 1); + cache.put("key2", 2); + + IgniteCache<String, Integer> cacheAsync = cache.withAsync(); + + cacheAsync.get("key1"); + + IgniteFuture<Integer> fut1 = cacheAsync.future(); + + cacheAsync.get("key2"); + + IgniteFuture<Integer> fut2 = cacheAsync.future(); + + cacheAsync.get("wrongKey"); + + IgniteFuture<Integer> fut3 = cacheAsync.future(); + + assert fut1.get() == 1; + assert fut2.get() == 2; + assert fut3.get() == null; + } + + /** + * @throws Exception In case of error. + */ + public void testGetAll() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() { + final Object key1 = key(1); + final Object key2 = key(2); + final Object key9999 = key(9999); + + final Object val1 = value(1); + final Object val2 = value(2); + + Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; + + final IgniteCache<Object, Object> cache = jcache(); + + try { + cache.put(key1, val1); + cache.put(key2, val2); + + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.getAll(null).isEmpty(); + + return null; + } + }, NullPointerException.class, null); + + assert cache.getAll(Collections.<Object>emptySet()).isEmpty(); + + Map<Object, Object> map1 = cache.getAll(ImmutableSet.of(key1, key2, key9999)); + + info("Retrieved map1: " + map1); + + assert 2 == map1.size() : "Invalid map: " + map1; + + assertEquals(val1, map1.get(key1)); + assertEquals(val2, map1.get(key2)); + assertNull(map1.get(key9999)); + + Map<Object, Object> map2 = cache.getAll(ImmutableSet.of(key1, key2, key9999)); + + info("Retrieved map2: " + map2); + + assert 2 == map2.size() : "Invalid map: " + map2; + + assertEquals(val1, map2.get(key1)); + assertEquals(val2, map2.get(key2)); + assertNull(map2.get(key9999)); + + // Now do the same checks but within transaction. + if (txShouldBeUsed()) { + try (Transaction tx0 = transactions().txStart()) { + assert cache.getAll(Collections.<Object>emptySet()).isEmpty(); + + map1 = cache.getAll(ImmutableSet.of(key1, key2, key9999)); + + info("Retrieved map1: " + map1); + + assert 2 == map1.size() : "Invalid map: " + map1; + + assertEquals(val1, map2.get(key1)); + assertEquals(val2, map2.get(key2)); + assertNull(map2.get(key9999)); + + map2 = cache.getAll(ImmutableSet.of(key1, key2, key9999)); + + info("Retrieved map2: " + map2); + + assert 2 == map2.size() : "Invalid map: " + map2; + + assertEquals(val1, map2.get(key1)); + assertEquals(val2, map2.get(key2)); + assertNull(map2.get(key9999)); + + tx0.commit(); + } + } + } + }); + } + + /** + * @throws Exception In case of error. + */ + public void testGetAllWithNulls() throws Exception { + final IgniteCache<String, Integer> cache = jcache(); + + final Set<String> c = new HashSet<>(); + + c.add("key1"); + c.add(null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.getAll(c); + + return null; + } + }, NullPointerException.class, null); + } + + /** + * @throws Exception If failed. + */ + public void testGetTxNonExistingKey() throws Exception { + if (txShouldBeUsed()) { + try (Transaction ignored = transactions().txStart()) { + assert jcache().get("key999123") == null; + } + } + } + + /** + * @throws Exception In case of error. + */ + public void testGetAllAsync() throws Exception { + final IgniteCache<String, Integer> cache = jcache(); + + final IgniteCache<String, Integer> cacheAsync = cache.withAsync(); + + cache.put("key1", 1); + cache.put("key2", 2); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cacheAsync.getAll(null); + + return null; + } + }, NullPointerException.class, null); + + cacheAsync.getAll(Collections.<String>emptySet()); + IgniteFuture<Map<String, Integer>> fut2 = cacheAsync.future(); + + cacheAsync.getAll(ImmutableSet.of("key1", "key2")); + IgniteFuture<Map<String, Integer>> fut3 = cacheAsync.future(); + + assert fut2.get().isEmpty(); + assert fut3.get().size() == 2 : "Invalid map: " + fut3.get(); + assert fut3.get().get("key1") == 1; + assert fut3.get().get("key2") == 2; + } + + /** + * @throws Exception In case of error. + */ + public void testPut() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + IgniteCache cache = jcache(); + + final Object key1 = key(1); + final Object val1 = value(1); + final Object key2 = key(2); + final Object val2 = value(2); + + assert cache.getAndPut(key1, val1) == null; + assert cache.getAndPut(key2, val2) == null; + + // Check inside transaction. + assertEquals(val1, cache.get(key1)); + assertEquals(val2, cache.get(key2)); + + // Put again to check returned values. + assertEquals(val1, cache.getAndPut(key1, val1)); + assertEquals(val2, cache.getAndPut(key2, val2)); + + checkContainsKey(true, key1); + checkContainsKey(true, key2); + + assert cache.get(key1) != null; + assert cache.get(key2) != null; + assert cache.get(key(100500)) == null; + + // Check outside transaction. + checkContainsKey(true, key1); + checkContainsKey(true, key2); + + assertEquals(val1, cache.get(key1)); + assertEquals(val2, cache.get(key2)); + assert cache.get(key(100500)) == null; + + assertEquals(val1, cache.getAndPut(key1, value(10))); + assertEquals(val2, cache.getAndPut(key2, value(11))); + } + }); + } + + /** + * @throws Exception In case of error. + */ + public void testPutTx() throws Exception { + if (txShouldBeUsed()) { + IgniteCache<String, Integer> cache = jcache(); + + try (Transaction tx = transactions().txStart()) { + assert cache.getAndPut("key1", 1) == null; + assert cache.getAndPut("key2", 2) == null; + + // Check inside transaction. + assert cache.get("key1") == 1; + assert cache.get("key2") == 2; + + // Put again to check returned values. + assert cache.getAndPut("key1", 1) == 1; + assert cache.getAndPut("key2", 2) == 2; + + assert cache.get("key1") != null; + assert cache.get("key2") != null; + assert cache.get("wrong") == null; + + tx.commit(); + } + + // Check outside transaction. + checkContainsKey(true, "key1"); + checkContainsKey(true, "key2"); + + assert cache.get("key1") == 1; + assert cache.get("key2") == 2; + assert cache.get("wrong") == null; + + assertEquals((Integer)1, cache.getAndPut("key1", 10)); + assertEquals((Integer)2, cache.getAndPut("key2", 11)); + } + } + + /** + * @throws Exception If failed. + */ + public void testInvokeOptimisticReadCommitted() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvoke(OPTIMISTIC, READ_COMMITTED); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeOptimisticRepeatableRead() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvoke(OPTIMISTIC, REPEATABLE_READ); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testInvokePessimisticReadCommitted() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvoke(PESSIMISTIC, READ_COMMITTED); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testInvokePessimisticRepeatableRead() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvoke(PESSIMISTIC, REPEATABLE_READ); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testIgniteInvokeOptimisticReadCommitted1() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkIgniteInvoke(OPTIMISTIC, READ_COMMITTED); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testIgniteInvokeOptimisticRepeatableRead() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkIgniteInvoke(OPTIMISTIC, REPEATABLE_READ); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testIgniteInvokePessimisticReadCommitted() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkIgniteInvoke(PESSIMISTIC, READ_COMMITTED); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testIgniteInvokePessimisticRepeatableRead() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkIgniteInvoke(PESSIMISTIC, REPEATABLE_READ); + } + }); + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + * @throws Exception If failed. + */ + private void checkIgniteInvoke(TransactionConcurrency concurrency, TransactionIsolation isolation) + throws Exception { + checkInvoke(concurrency, isolation, INCR_IGNITE_PROCESSOR, RMV_IGNITE_PROCESSOR); + } + + /** + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @param incrProcessor Increment processor. + * @param rmvProseccor Remove processor. + */ + private void checkInvoke(TransactionConcurrency concurrency, TransactionIsolation isolation, + EntryProcessor<Object, Object, Object> incrProcessor, + EntryProcessor<Object, Object, Object> rmvProseccor) { + IgniteCache cache = jcache(); + + final Object key1 = key(1); + final Object key2 = key(2); + final Object key3 = key(3); + + final Object val1 = value(1); + final Object val2 = value(2); + final Object val3 = value(3); + + cache.put(key2, val1); + cache.put(key3, val3); + + Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, isolation) : null; + + try { + assertNull(cache.invoke(key1, incrProcessor, dataMode)); + assertEquals(val1, cache.invoke(key2, incrProcessor, dataMode)); + assertEquals(val3, cache.invoke(key3, rmvProseccor)); + + if (tx != null) + tx.commit(); + } + catch (Exception e) { + e.printStackTrace(); + + throw e; + } + finally { + if (tx != null) + tx.close(); + } + + assertEquals(val1, cache.get(key1)); + assertEquals(val2, cache.get(key2)); + assertNull(cache.get(key3)); + + for (int i = 0; i < gridCount(); i++) + assertNull("Failed for cache: " + i, jcache(i).localPeek(key3, ONHEAP)); + + cache.remove(key1); + cache.put(key2, val1); + cache.put(key3, val3); + + assertNull(cache.invoke(key1, incrProcessor, dataMode)); + assertEquals(val1, cache.invoke(key2, incrProcessor, dataMode)); + assertEquals(val3, cache.invoke(key3, rmvProseccor)); + + assertEquals(val1, cache.get(key1)); + assertEquals(val2, cache.get(key2)); + assertNull(cache.get(key3)); + + for (int i = 0; i < gridCount(); i++) + assertNull(jcache(i).localPeek(key3, ONHEAP)); + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + * @throws Exception If failed. + */ + private void checkInvoke(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception { + checkInvoke(concurrency, isolation, INCR_PROCESSOR, RMV_PROCESSOR); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeAllOptimisticReadCommitted() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvokeAll(OPTIMISTIC, READ_COMMITTED); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeAllOptimisticRepeatableRead() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvokeAll(OPTIMISTIC, REPEATABLE_READ); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeAllPessimisticReadCommitted() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvokeAll(PESSIMISTIC, READ_COMMITTED); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeAllPessimisticRepeatableRead() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvokeAll(PESSIMISTIC, REPEATABLE_READ); + } + }); + } + + /** + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @throws Exception If failed. + */ + private void checkInvokeAll(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception { + // TODO IGNITE-2664: enable tests for all modes when IGNITE-2664 will be fixed. + if (dataMode != DataMode.EXTERNALIZABLE && gridCount() > 1) + return; + + final Object key1 = key(1); + final Object key2 = key(2); + final Object key3 = key(3); + + final Object val1 = value(1); + final Object val2 = value(2); + final Object val3 = value(3); + final Object val4 = value(4); + + final IgniteCache<Object, Object> cache = jcache(); + + cache.put(key2, val1); + cache.put(key3, val3); + + if (txShouldBeUsed()) { + Map<Object, EntryProcessorResult<Object>> res; + + try (Transaction tx = ignite(0).transactions().txStart(concurrency, isolation)) { + res = cache.invokeAll(F.asSet(key1, key2, key3), INCR_PROCESSOR, dataMode); + + tx.commit(); + } + + assertEquals(val1, cache.get(key1)); + assertEquals(val2, cache.get(key2)); + assertEquals(val4, cache.get(key3)); + + assertNull(res.get(key1)); + assertEquals(val1, res.get(key2).get()); + assertEquals(val3, res.get(key3).get()); + + assertEquals(2, res.size()); + + cache.remove(key1); + cache.put(key2, val1); + cache.put(key3, val3); + } + + Map<Object, EntryProcessorResult<Object>> res = cache.invokeAll(F.asSet(key1, key2, key3), RMV_PROCESSOR); + + for (int i = 0; i < gridCount(); i++) { + assertNull(jcache(i).localPeek(key1, ONHEAP)); + assertNull(jcache(i).localPeek(key2, ONHEAP)); + assertNull(jcache(i).localPeek(key3, ONHEAP)); + } + + assertNull(res.get(key1)); + assertEquals(val1, res.get(key2).get()); + assertEquals(val3, res.get(key3).get()); + + assertEquals(2, res.size()); + + cache.remove(key1); + cache.put(key2, val1); + cache.put(key3, val3); + + res = cache.invokeAll(F.asSet(key1, key2, key3), INCR_PROCESSOR, dataMode); + + assertEquals(val1, cache.get(key1)); + assertEquals(val2, cache.get(key2)); + assertEquals(val4, cache.get(key3)); + + assertNull(res.get(key1)); + assertEquals(val1, res.get(key2).get()); + assertEquals(val3, res.get(key3).get()); + + assertEquals(2, res.size()); + + cache.remove(key1); + cache.put(key2, val1); + cache.put(key3, val3); + + res = cache.invokeAll(F.asMap(key1, INCR_PROCESSOR, key2, INCR_PROCESSOR, key3, INCR_PROCESSOR), dataMode); + + assertEquals(val1, cache.get(key1)); + assertEquals(val2, cache.get(key2)); + assertEquals(val4, cache.get(key3)); + + assertNull(res.get(key1)); + assertEquals(val1, res.get(key2).get()); + assertEquals(val3, res.get(key3).get()); + + assertEquals(2, res.size()); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeAllWithNulls() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + final Object key1 = key(1); + + final IgniteCache<Object, Object> cache = jcache(); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.invokeAll((Set<Object>)null, INCR_PROCESSOR, dataMode); + + return null; + } + }, NullPointerException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.invokeAll(F.asSet(key1), null); + + return null; + } + }, NullPointerException.class, null); + + { + final Set<Object> keys = new LinkedHashSet<>(2); + + keys.add(key1); + keys.add(null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.invokeAll(keys, INCR_PROCESSOR, dataMode); + + return null; + } + }, NullPointerException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.invokeAll(F.asSet(key1), null); + + return null; + } + }, NullPointerException.class, null); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeSequentialOptimisticNoStart() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvokeSequential0(false, OPTIMISTIC); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeSequentialPessimisticNoStart() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvokeSequential0(false, PESSIMISTIC); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeSequentialOptimisticWithStart() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvokeSequential0(true, OPTIMISTIC); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeSequentialPessimisticWithStart() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvokeSequential0(true, PESSIMISTIC); + } + }); + } + + /** + * @param startVal Whether to put value. + * @param concurrency Concurrency. + * @throws Exception If failed. + */ + private void checkInvokeSequential0(boolean startVal, TransactionConcurrency concurrency) + throws Exception { + final Object val1 = value(1); + final Object val2 = value(2); + final Object val3 = value(3); + + IgniteCache<Object, Object> cache = jcache(); + + final Object key = primaryTestObjectKeysForCache(cache, 1).get(0); + + Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null; + + try { + if (startVal) + cache.put(key, val2); + else + assertEquals(null, cache.get(key)); + + Object expRes = startVal ? val2 : null; + + assertEquals(expRes, cache.invoke(key, INCR_PROCESSOR, dataMode)); + + expRes = startVal ? val3 : val1; + + assertEquals(expRes, cache.invoke(key, INCR_PROCESSOR, dataMode)); + + expRes = value(valueOf(expRes) + 1); + + assertEquals(expRes, cache.invoke(key, INCR_PROCESSOR, dataMode)); + + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } + + Object exp = value((startVal ? 2 : 0) + 3); + + assertEquals(exp, cache.get(key)); + + for (int i = 0; i < gridCount(); i++) { + if (ignite(i).affinity(cacheName()).isPrimaryOrBackup(grid(i).localNode(), key)) + assertEquals(exp, peek(jcache(i), key)); + } + } + + /** + * @throws Exception If failed. + */ + public void testInvokeAfterRemoveOptimistic() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvokeAfterRemove(OPTIMISTIC); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeAfterRemovePessimistic() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvokeAfterRemove(PESSIMISTIC); + } + }); + } + + /** + * @param concurrency Concurrency. + * @throws Exception If failed. + */ + private void checkInvokeAfterRemove(TransactionConcurrency concurrency) throws Exception { + IgniteCache<Object, Object> cache = jcache(); + + Object key = key(1); + + cache.put(key, value(4)); + + Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null; + + try { + cache.remove(key); + + cache.invoke(key, INCR_PROCESSOR, dataMode); + cache.invoke(key, INCR_PROCESSOR, dataMode); + cache.invoke(key, INCR_PROCESSOR, dataMode); + + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } + + assertEquals(value(3), cache.get(key)); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeReturnValueGetOptimisticReadCommitted() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvokeReturnValue(false, OPTIMISTIC, READ_COMMITTED); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeReturnValueGetOptimisticRepeatableRead() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvokeReturnValue(false, OPTIMISTIC, REPEATABLE_READ); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeReturnValueGetPessimisticReadCommitted() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvokeReturnValue(false, PESSIMISTIC, READ_COMMITTED); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeReturnValueGetPessimisticRepeatableRead() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvokeReturnValue(false, PESSIMISTIC, REPEATABLE_READ); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeReturnValuePutInTx() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + checkInvokeReturnValue(true, OPTIMISTIC, READ_COMMITTED); + } + }); + } + + /** + * @param put Whether to put value. + * @param concurrency Concurrency. + * @param isolation Isolation. + * @throws Exception If failed. + */ + private void checkInvokeReturnValue(boolean put, + TransactionConcurrency concurrency, + TransactionIsolation isolation) + throws Exception { + IgniteCache<Object, Object> cache = jcache(); + + Object key = key(1); + Object val1 = value(1); + Object val2 = value(2); + + if (!put) + cache.put(key, val1); + + Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, isolation) : null; + + try { + if (put) + cache.put(key, val1); + + cache.invoke(key, INCR_PROCESSOR, dataMode); + + assertEquals(val2, cache.get(key)); + + if (tx != null) { + // Second get inside tx. Make sure read value is not transformed twice. + assertEquals(val2, cache.get(key)); + + tx.commit(); + } + } + finally { + if (tx != null) + tx.close(); + } + } + + /** + * @throws Exception In case of error. + */ + public void testGetAndPutAsync() throws Exception { + IgniteCache<String, Integer> cache = jcache(); + + IgniteCache<String, Integer> cacheAsync = cache.withAsync(); + + cache.put("key1", 1); + cache.put("key2", 2); + + cacheAsync.getAndPut("key1", 10); + + IgniteFuture<Integer> fut1 = cacheAsync.future(); + + cacheAsync.getAndPut("key2", 11); + + IgniteFuture<Integer> fut2 = cacheAsync.future(); + + assertEquals((Integer)1, fut1.get(5000)); + assertEquals((Integer)2, fut2.get(5000)); + + assertEquals((Integer)10, cache.get("key1")); + assertEquals((Integer)11, cache.get("key2")); + } + + /** + * @throws Exception In case of error. + */ + public void testPutAsync0() throws Exception { + IgniteCache cacheAsync = jcache().withAsync(); + + cacheAsync.getAndPut("key1", 0); + + IgniteFuture<Integer> fut1 = cacheAsync.future(); + + cacheAsync.getAndPut("key2", 1); + + IgniteFuture<Integer> fut2 = cacheAsync.future(); + + assert fut1.get(5000) == null; + assert fut2.get(5000) == null; + } + + /** + * @throws Exception If failed. + */ + public void testInvokeAsync() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + final Object key1 = key(1); + final Object key2 = key(2); + final Object key3 = key(3); + + final Object val1 = value(1); + final Object val2 = value(2); + final Object val3 = value(3); + + IgniteCache<Object, Object> cache = jcache(); + + cache.put(key2, val1); + cache.put(key3, val3); + + IgniteCache<Object, Object> cacheAsync = cache.withAsync(); + + assertNull(cacheAsync.invoke(key1, INCR_PROCESSOR, dataMode)); + + IgniteFuture<?> fut0 = cacheAsync.future(); + + assertNull(cacheAsync.invoke(key2, INCR_PROCESSOR, dataMode)); + + IgniteFuture<?> fut1 = cacheAsync.future(); + + assertNull(cacheAsync.invoke(key3, RMV_PROCESSOR)); + + IgniteFuture<?> fut2 = cacheAsync.future(); + + fut0.get(); + fut1.get(); + fut2.get(); + + assertEquals(val1, cache.get(key1)); + assertEquals(val2, cache.get(key2)); + assertNull(cache.get(key3)); + + for (int i = 0; i < gridCount(); i++) + assertNull(jcache(i).localPeek(key3, ONHEAP)); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testInvoke() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + final Object k0 = key(0); + final Object k1 = key(1); + + final Object val1 = value(1); + final Object val2 = value(2); + final Object val3 = value(3); + + final IgniteCache<Object, Object> cache = jcache(); + + assertNull(cache.invoke(k0, INCR_PROCESSOR, dataMode)); + + assertEquals(k1, cache.get(k0)); + + assertEquals(val1, cache.invoke(k0, INCR_PROCESSOR, dataMode)); + + assertEquals(val2, cache.get(k0)); + + cache.put(k1, val1); + + assertEquals(val1, cache.invoke(k1, INCR_PROCESSOR, dataMode)); + + assertEquals(val2, cache.get(k1)); + + assertEquals(val2, cache.invoke(k1, INCR_PROCESSOR, dataMode)); + + assertEquals(val3, cache.get(k1)); + + RemoveAndReturnNullEntryProcessor c = new RemoveAndReturnNullEntryProcessor(); + + assertNull(cache.invoke(k1, c)); + assertNull(cache.get(k1)); + + for (int i = 0; i < gridCount(); i++) + assertNull(jcache(i).localPeek(k1, ONHEAP)); + + final EntryProcessor<Object, Object, Object> errProcessor = new FailedEntryProcessor(); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.invoke(k1, errProcessor); + + return null; + } + }, EntryProcessorException.class, "Test entry processor exception."); + } + }); + } + + /** + * @throws Exception In case of error. + */ + public void testPutx() throws Exception { + if (txShouldBeUsed()) + checkPut(true); + } + + /** + * @throws Exception In case of error. + */ + public void testPutxNoTx() throws Exception { + checkPut(false); + } + + /** + * @param inTx Whether to start transaction. + * @throws Exception If failed. + */ + private void checkPut(boolean inTx) throws Exception { + Transaction tx = inTx ? transactions().txStart() : null; + + IgniteCache<String, Integer> cache = jcache(); + + try { + cache.put("key1", 1); + cache.put("key2", 2); + + // Check inside transaction. + assert cache.get("key1") == 1; + assert cache.get("key2") == 2; + + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } + + checkSize(F.asSet("key1", "key2")); + + // Check outside transaction. + checkContainsKey(true, "key1"); + checkContainsKey(true, "key2"); + checkContainsKey(false, "wrong"); + + assert cache.get("key1") == 1; + assert cache.get("key2") == 2; + assert cache.get("wrong") == null; + } + + /** + * @throws Exception If failed. + */ + public void testPutAsync() throws Exception { + Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; + + IgniteCache cacheAsync = jcache().withAsync(); + + try { + jcache().put("key2", 1); + + cacheAsync.put("key1", 10); + + IgniteFuture<?> fut1 = cacheAsync.future(); + + cacheAsync.put("key2", 11); + + IgniteFuture<?> fut2 = cacheAsync.future(); + + IgniteFuture<Transaction> f = null; + + if (tx != null) { + tx = (Transaction)tx.withAsync(); + + tx.commit(); + + f = tx.future(); + } + + assertNull(fut1.get()); + assertNull(fut2.get()); + + assert f == null || f.get().state() == COMMITTED; + } + finally { + if (tx != null) + tx.close(); + } + + checkSize(F.asSet("key1", "key2")); + + assert jcache().get("key1") == 10; + assert jcache().get("key2") == 11; + } + + /** + * @throws Exception In case of error. + */ + public void testPutAll() throws Exception { + Map<String, Integer> map = F.asMap("key1", 1, "key2", 2); + + IgniteCache<String, Integer> cache = jcache(); + + cache.putAll(map); + + checkSize(F.asSet("key1", "key2")); + + assert cache.get("key1") == 1; + assert cache.get("key2") == 2; + + map.put("key1", 10); + map.put("key2", 20); + + cache.putAll(map); + + checkSize(F.asSet("key1", "key2")); + + assert cache.get("key1") == 10; + assert cache.get("key2") == 20; + } + + /** + * @throws Exception In case of error. + */ + public void testNullInTx() throws Exception { + if (!txShouldBeUsed()) + return; + + final IgniteCache<String, Integer> cache = jcache(); + + for (int i = 0; i < 100; i++) { + final String key = "key-" + i; + + assertNull(cache.get(key)); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteTransactions txs = transactions(); + + try (Transaction tx = txs.txStart()) { + cache.put(key, 1); + + cache.put(null, 2); + + tx.commit(); + } + + return null; + } + }, NullPointerException.class, null); + + assertNull(cache.get(key)); + + cache.put(key, 1); + + assertEquals(1, (int)cache.get(key)); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteTransactions txs = transactions(); + + try (Transaction tx = txs.txStart()) { + cache.put(key, 2); + + cache.remove(null); + + tx.commit(); + } + + return null; + } + }, NullPointerException.class, null); + + assertEquals(1, (int)cache.get(key)); + + cache.put(key, 2); + + assertEquals(2, (int)cache.get(key)); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteTransactions txs = transactions(); + + Map<String, Integer> map = new LinkedHashMap<>(); + + map.put("k1", 1); + map.put("k2", 2); + map.put(null, 3); + + try (Transaction tx = txs.txStart()) { + cache.put(key, 1); + + cache.putAll(map); + + tx.commit(); + } + + return null; + } + }, NullPointerException.class, null); + + assertNull(cache.get("k1")); + assertNull(cache.get("k2")); + + assertEquals(2, (int)cache.get(key)); + + cache.put(key, 3); + + assertEquals(3, (int)cache.get(key)); + } + } + + /** + * @throws Exception In case of error. + */ + public void testPutAllWithNulls() throws Exception { + final IgniteCache<String, Integer> cache = jcache(); + + { + final Map<String, Integer> m = new LinkedHashMap<>(2); + + m.put("key1", 1); + m.put(null, 2); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.putAll(m); + + return null; + } + }, NullPointerException.class, null); + + cache.put("key1", 1); + + assertEquals(1, (int)cache.get("key1")); + } + + { + final Map<String, Integer> m = new LinkedHashMap<>(2); + + m.put("key3", 3); + m.put("key4", null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.putAll(m); + + return null; + } + }, NullPointerException.class, null); + + m.put("key4", 4); + + cache.putAll(m); + + assertEquals(3, (int)cache.get("key3")); + assertEquals(4, (int)cache.get("key4")); + } + + assertThrows(log, new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + cache.put("key1", null); + + return null; + } + }, NullPointerException.class, A.NULL_MSG_PREFIX); + + assertThrows(log, new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + cache.getAndPut("key1", null); + + return null; + } + }, NullPointerException.class, A.NULL_MSG_PREFIX); + + assertThrows(log, new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + cache.put(null, 1); + + return null; + } + }, NullPointerException.class, A.NULL_MSG_PREFIX); + + assertThrows(log, new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + cache.replace(null, 1); + + return null; + } + }, NullPointerException.class, A.NULL_MSG_PREFIX); + + assertThrows(log, new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + cache.getAndReplace(null, 1); + + return null; + } + }, NullPointerException.class, A.NULL_MSG_PREFIX); + + assertThrows(log, new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + cache.replace("key", null); + + return null; + } + }, NullPointerException.class, A.NULL_MSG_PREFIX); + + assertThrows(log, new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + cache.getAndReplace("key", null); + + return null; + } + }, NullPointerException.class, A.NULL_MSG_PREFIX); + + assertThrows(log, new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + cache.replace(null, 1, 2); + + return null; + } + }, NullPointerException.class, A.NULL_MSG_PREFIX); + + assertThrows(log, new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + cache.replace("key", null, 2); + + return null; + } + }, NullPointerException.class, A.NULL_MSG_PREFIX); + + assertThrows(log, new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + cache.replace("key", 1, null); + + return null; + } + }, NullPointerException.class, A.NULL_MSG_PREFIX); + } + + /** + * @throws Exception In case of error. + */ + public void testPutAllAsync() throws Exception { + Map<String, Integer> map = F.asMap("key1", 1, "key2", 2); + + IgniteCache<String, Integer> cache = jcache(); + + IgniteCache<String, Integer> cacheAsync = cache.withAsync(); + + cacheAsync.putAll(map); + + IgniteFuture<?> f1 = cacheAsync.future(); + + map.put("key1", 10); + map.put("key2", 20); + + cacheAsync.putAll(map); + + IgniteFuture<?> f2 = cacheAsync.future(); + + assertNull(f2.get()); + assertNull(f1.get()); + + checkSize(F.asSet("key1", "key2")); + + assert cache.get("key1") == 10; + assert cache.get("key2") == 20; + } + + /** + * @throws Exception In case of error. + */ + public void testGetAndPutIfAbsent() throws Exception { + Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; + + IgniteCache<String, Integer> cache = jcache(); + + try { + assert cache.getAndPutIfAbsent("key", 1) == null; + + assert cache.get("key") != null; + assert cache.get("key") == 1; + + assert cache.getAndPutIfAbsent("key", 2) != null; + assert cache.getAndPutIfAbsent("key", 2) == 1; + + assert cache.get("key") != null; + assert cache.get("key") == 1; + + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } + + assert cache.getAndPutIfAbsent("key", 2) != null; + + for (int i = 0; i < gridCount(); i++) { + info("Peek on node [i=" + i + ", id=" + grid(i).localNode().id() + ", val=" + + grid(i).cache(cacheName()).localPeek("key", ONHEAP) + ']'); + } + + assertEquals((Integer)1, cache.getAndPutIfAbsent("key", 2)); + + assert cache.get("key") != null; + assert cache.get("key") == 1; + + if (!storeEnabled()) + return; + + // Check swap. + cache.put("key2", 1); + + cache.localEvict(Collections.singleton("key2")); + + if (!isLoadPreviousValue()) + cache.get("key2"); + + assertEquals((Integer)1, cache.getAndPutIfAbsent("key2", 3)); + + // Check db. + if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { + putToStore("key3", 3); + + assertEquals((Integer)3, cache.getAndPutIfAbsent("key3", 4)); + + assertEquals((Integer)3, cache.get("key3")); + } + + assertEquals((Integer)1, cache.get("key2")); + + cache.localEvict(Collections.singleton("key2")); + + if (!isLoadPreviousValue()) + cache.get("key2"); + + // Same checks inside tx. + tx = txShouldBeUsed() ? transactions().txStart() : null; + + try { + assertEquals((Integer)1, cache.getAndPutIfAbsent("key2", 3)); + + if (tx != null) + tx.commit(); + + assertEquals((Integer)1, cache.get("key2")); + } + finally { + if (tx != null) + tx.close(); + } + } + + /** + * @throws Exception If failed. + */ + public void testGetAndPutIfAbsentAsync() throws Exception { + Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; + + IgniteCache<String, Integer> cache = jcache(); + + IgniteCache<String, Integer> cacheAsync = cache.withAsync(); + + try { + cacheAsync.getAndPutIfAbsent("key", 1); + + IgniteFuture<Integer> fut1 = cacheAsync.future(); + + assertNull(fut1.get()); + assertEquals((Integer)1, cache.get("key")); + + cacheAsync.getAndPutIfAbsent("key", 2); + + IgniteFuture<Integer> fut2 = cacheAsync.future(); + + assertEquals((Integer)1, fut2.get()); + assertEquals((Integer)1, cache.get("key")); + + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } + + if (!storeEnabled()) + return; + + // Check swap. + cache.put("key2", 1); + + cache.localEvict(Collections.singleton("key2")); + + if (!isLoadPreviousValue()) + cache.get("key2"); + + cacheAsync.getAndPutIfAbsent("key2", 3); + + assertEquals((Integer)1, cacheAsync.<Integer>future().get()); + + // Check db. + if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { + putToStore("key3", 3); + + cacheAsync.getAndPutIfAbsent("key3", 4); + + assertEquals((Integer)3, cacheAsync.<Integer>future().get()); + } + + cache.localEvict(Collections.singleton("key2")); + + if (!isLoadPreviousValue()) + cache.get("key2"); + + // Same checks inside tx. + tx = txShouldBeUsed() ? transactions().txStart() : null; + + try { + cacheAsync.getAndPutIfAbsent("key2", 3); + + assertEquals(1, cacheAsync.future().get()); + + if (tx != null) + tx.commit(); + + assertEquals((Integer)1, cache.get("key2")); + } + finally { + if (tx != null) + tx.close(); + } + } + + /** + * @throws Exception If failed. + */ + public void testPutIfAbsent() throws Exception { + IgniteCache<String, Integer> cache = jcache(); + + assertNull(cache.get("key")); + assert cache.putIfAbsent("key", 1); + assert cache.get("key") != null && cache.get("key") == 1; + assert !cache.putIfAbsent("key", 2); + assert cache.get("key") != null && cache.get("key") == 1; + + if (!storeEnabled()) + return; + + // Check swap. + cache.put("key2", 1); + + cache.localEvict(Collections.singleton("key2")); + + if (!isLoadPreviousValue()) + cache.get("key2"); + + assertFalse(cache.putIfAbsent("key2", 3)); + + // Check db. + if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { + putToStore("key3", 3); + + assertFalse(cache.putIfAbsent("key3", 4)); + } + + cache.localEvict(Collections.singleton("key2")); + + if (!isLoadPreviousValue()) + cache.get("key2"); + + // Same checks inside tx. + Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; + + try { + assertFalse(cache.putIfAbsent("key2", 3)); + + if (tx != null) + tx.commit(); + + assertEquals((Integer)1, cache.get("key2")); + } + finally { + if (tx != null) + tx.close(); + } + } + + /** + * @throws Exception In case of error. + */ + public void testPutxIfAbsentAsync() throws Exception { + if (txShouldBeUsed()) + checkPutxIfAbsentAsync(true); + } + + /** + * @throws Exception In case of error. + */ + public void testPutxIfAbsentAsyncNoTx() throws Exception { + checkPutxIfAbsentAsync(false); + } + + /** + * @param inTx In tx flag. + * @throws Exception If failed. + */ + private void checkPutxIfAbsentAsync(boolean inTx) throws Exception { + IgniteCache<String, Integer> cache = jcache(); + + IgniteCache<String, Integer> cacheAsync = cache.withAsync(); + + cacheAsync.putIfAbsent("key", 1); + + IgniteFuture<Boolean> fut1 = cacheAsync.future(); + + assert fut1.get(); + assert cache.get("key") != null && cache.get("key") == 1; + + cacheAsync.putIfAbsent("key", 2); + + IgniteFuture<Boolean> fut2 = cacheAsync.future(); + + assert !fut2.get(); + assert cache.get("key") != null && cache.get("key") == 1; + + if (!storeEnabled()) + return; + + // Check swap. + cache.put("key2", 1); + + cache.localEvict(Collections.singleton("key2")); + + if (!isLoadPreviousValue()) + cache.get("key2"); + + cacheAsync.putIfAbsent("key2", 3); + + assertFalse(cacheAsync.<Boolean>future().get()); + + // Check db. + if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { + putToStore("key3", 3); + + cacheAsync.putIfAbsent("key3", 4); + + assertFalse(cacheAsync.<Boolean>future().get()); + } + + cache.localEvict(Collections.singletonList("key2")); + + if (!isLoadPreviousValue()) + cache.get("key2"); + + // Same checks inside tx. + Transaction tx = inTx ? transactions().txStart() : null; + + try { + cacheAsync.putIfAbsent("key2", 3); + + assertFalse(cacheAsync.<Boolean>future().get()); + + if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { + cacheAsync.putIfAbsent("key3", 4); + + assertFalse(cacheAsync.<Boolean>future().get()); + } + + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } + + assertEquals((Integer)1, cache.get("key2")); + + if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) + assertEquals((Integer)3, cache.get("key3")); + } + + /** + * @throws Exception In case of error. + */ + public void testPutIfAbsentAsyncConcurrent() throws Exception { + IgniteCache cacheAsync = jcache().withAsync(); + + cacheAsync.putIfAbsent("key1", 1); + + IgniteFuture<Boolean> fut1 = cacheAsync.future(); + + cacheAsync.putIfAbsent("key2", 2); + + IgniteFuture<Boolean> fut2 = cacheAsync.future(); + + assert fut1.get(); + assert fut2.get(); + } + + /** + * @throws Exception If failed. + */ + public void testGetAndReplace() throws Exception { + IgniteCache<String, Integer> cache = jcache(); + + cache.put("key", 1); + + assert cache.get("key") == 1; + + info("key 1 -> 2"); + + assert cache.getAndReplace("key", 2) == 1; + + assert cache.get("key") == 2; + + assert cache.getAndReplace("wrong", 0) == null; + + assert cache.get("wrong") == null; + + info("key 0 -> 3"); + + assert !cache.replace("key", 0, 3); + + assert cache.get("key") == 2; + + info("key 0 -> 3"); + + assert !cache.replace("key", 0, 3); + + assert cache.get("key") == 2; + + info("key 2 -> 3"); + + assert cache.replace("key", 2, 3); + + assert cache.get("key") == 3; + + if (!storeEnabled()) + return; + + info("evict key"); + + cache.localEvict(Collections.singleton("key")); + + info("key 3 -> 4"); + + if (!isLoadPreviousValue()) + cache.get("key"); + + assert cache.replace("key", 3, 4); + + assert cache.get("key") == 4; + + if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { + putToStore("key2", 5); + + info("key2 5 -> 6"); + + assert cache.replace("key2", 5, 6); + } + + for (int i = 0; i < gridCount(); i++) { + info("Peek key on grid [i=" + i + ", nodeId=" + grid(i).localNode().id() + + ", peekVal=" + grid(i).cache(cacheName()).localPeek("key", ONHEAP) + ']'); + + info("Peek key2 on grid [i=" + i + ", nodeId=" + grid(i).localNode().id() + + ", peekVal=" + grid(i).cache(cacheName()).localPeek("key2", ONHEAP) + ']'); + } + + if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) + assertEquals((Integer)6, cache.get("key2")); + + cache.localEvict(Collections.singleton("key")); + + if (!isLoadPreviousValue()) + cache.get("key"); + + Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; + + try { + assert cache.replace("key", 4, 5); + + if (tx != null) + tx.commit(); + + assert cache.get("key") == 5; + } + finally { + if (tx != null) + tx.close(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReplace() throws Exception { + IgniteCache<String, Integer> cache = jcache(); + + cache.put("key", 1); + + assert cache.get("key") == 1; + + assert cache.replace("key", 2); + + assert cache.get("key") == 2; + + assert !cache.replace("wrong", 2); + + if (!storeEnabled()) + return; + + cache.localEvict(Collections.singleton("key")); + + if (!isLoadPreviousValue()) + assert cache.get("key") == 2; + + assert cache.replace("key", 4); + + assert cache.get("key") == 4; + + if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { + putToStore("key2", 5); + + cache.replace("key2", 6); + + assertEquals((Integer)6, cache.get("key2")); + } + + cache.localEvict(Collections.singleton("key")); + + if (!isLoadPreviousValue()) + assert cache.get("key") == 4; + + Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; + + try { + assert cache.replace("key", 5); + + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } + + assert cache.get("key") == 5; + } + + /** + * @throws Exception If failed. + */ + public void testGetAndReplaceAsync() throws Exception { + IgniteCache<String, Integer> cache = jcache(); + + IgniteCache<String, Integer> cacheAsync = cache.withAsync(); + + cache.put("key", 1); + + assert cache.get("key") == 1; + + cacheAsync.getAndReplace("key", 2); + + assert cacheAsync.<Integer>future().get() == 1; + + assert cache.get("key") == 2; + + cacheAsync.getAndReplace("wrong", 0); + + assert cacheAsync.future().get() == null; + + assert cache.get("wrong") == null; + + cacheAsync.replace("key", 0, 3); + + assert !cacheAsync.<Boolean>future().get(); + + assert cache.get("key") == 2; + + cacheAsync.replace("key", 0, 3); + + assert !cacheAsync.<Boolean>future().get(); + + assert cache.get("key") == 2; + + cacheAsync.replace("key", 2, 3); + + assert cacheAsync.<Boolean>future().get(); + + assert cache.get("key") == 3; + + if (!storeEnabled()) + return; + + cache.localEvict(Collections.singleton("key")); + + if (!isLoadPreviousValue()) + cache.get("key"); + + cacheAsync.replace("key", 3, 4); + + assert cacheAsync.<Boolean>future().get(); + + assert cache.get("key") == 4; + + if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { + putToStore("key2", 5); + + cacheAsync.replace("key2", 5, 6); + + assert cacheAsync.<Boolean>future().get(); + + assertEquals((Integer)6, cache.get("key2")); + } + + cache.localEvict(Collections.singleton("key")); + + if (!isLoadPreviousValue()) + cache.get("key"); + + Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; + + try { + cacheAsync.replace("key", 4, 5); + + assert cacheAsync.<Boolean>future().get(); + + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } + + assert cache.get("key") == 5; + } + + /** + * @throws Exception If failed. + */ + public void testReplacexAsync() throws Exception { + IgniteCache<String, Integer> cache = jcache(); + + IgniteCache<String, Integer> cacheAsync = cache.withAsync(); + + cache.put("key", 1); + + assert cache.get("key") == 1; + + cacheAsync.replace("key", 2); + + assert cacheAsync.<Boolean>future().get(); + + info("Finished replace."); + + assertEquals((Integer)2, cache.get("key")); + + cacheAsync.replace("wrond", 2); + + assert !cacheAsync.<Boolean>future().get(); + + if (!storeEnabled()) + return; + + cache.localEvict(Collections.singleton("key")); + + if (!isLoadPreviousValue()) + cache.get("key"); + + cacheAsync.replace("key", 4); + + assert cacheAsync.<Boolean>future().get(); + + assert cache.get("key") == 4; + + if (storeEnabled() && isLoadPreviousValue() && !isMultiJvm()) { + putToStore("key2", 5); + + cacheAsync.replace("key2", 6); + + assert cacheAsync.<Boolean>future().get(); + + assert cache.get("key2") == 6; + } + + cache.localEvict(Collections.singleton("key")); + + if (!isLoadPreviousValue()) + cache.get("key"); + + Transaction tx = txShouldBeUsed() ? transactions().txStart() : null; + + try { + cacheAsync.replace("key", 5); + + assert cacheAsync.<Boolean>future().get(); + + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } + + assert cache.get("key") == 5; + } + + /** + * @throws Exception In case of error. + */ + public void testGetAndRemove() throws Exception { + IgniteCache<String, Integer> cache = jcache(); + + cache.put("key1", 1); + cache.put("key2", 2); + + assert !cache.remove("key1", 0); + assert cache.get("key1") != null && cache.get("key1") == 1; + assert cache.remove("key1", 1); + assert cache.get("key1") == null; + assert cache.getAndRemove("key2") == 2; + assert cache.get("key2") == null; + assert cache.getAndRemove("key2") == null; + } + + /** + * @throws Exception If failed. + */ + public void testGetAndRemoveObject() throws Exception { + IgniteCache<String, SerializableObject> cache = ignite(0).cache(cacheName()); + + SerializableObject val1 = new SerializableObject(1); + SerializableObject val2 = new SerializableObject(2); + + cache.put("key1", val1); + cache.put("key2", val2); + + assert !cache.remove("key1", new SerializableObject(0)); + + SerializableObject oldVal = cache.get("key1"); + + assert oldVal != null && F.eq(val1, oldVal); + + assert cache.remove("key1"); + + assert cache.get("key1") == null; + + SerializableObject oldVal2 = cache.getAndRemove("key2"); + + assert F.eq(val2, oldVal2); + + assert cache.get("key2") == null; + assert cache.getAndRemove("key2") == null; + } + + /** + * @throws Exception If failed. + */ + public void testGetAndPutSerializableObject() throws Exception { + IgniteCache<String, SerializableObject> cache = ignite(0).cache(cacheName()); + + SerializableObject val1 = new SerializableObject(1); + SerializableObject val2 = new SerializableObject(2); + + cache.put("key1", val1); + + SerializableObject oldVal = cache.get("key1"); + + assertEquals(val1, oldVal); + + oldVal = cache.getAndPut("key1", val2); + + assertEquals(val1, oldVal); + + SerializableObject updVal = cache.get("key1"); + + assertEquals(val2, updVal); + } + + /** + * @throws Exception If failed. + */ + public void testDeletedEntriesFlag() throws Exception { + if (cacheMode() != LOCAL && cacheMode() != REPLICATED && memoryMode() != OFFHEAP_TIERED) { + final int cnt = 3; + + IgniteCache<String, Integer> cache = jcache(); + + for (int i = 0; i < cnt; i++) + cache.put(String.valueOf(i), i); + + for (int i = 0; i < cnt; i++) + cache.remove(String.valueOf(i)); + + for (int g = 0; g < gridCount(); g++) + executeOnLocalOrRemoteJvm(g, new CheckEntriesDeletedTask(cnt, cacheName())); + } + } + + /** + * @throws Exception If failed. + */ + public void testRemoveLoad() throws Exception { + if (isMultiJvm()) + fail("https://issues.apache.org/jira/browse/IGNITE-1088"); + + if (!storeEnabled()) + return; + + int cnt = 10; + + Set<String> keys = new HashSet<>(); + + for (int i = 0; i < cnt; i++) + keys.add(String.valueOf(i)); + + jcache().removeAll(keys); + + for (String key : keys) + putToStore(key, Integer.parseInt(key)); + + for (int g = 0; g < gridCount(); g++) + grid(g).cache(cacheName()).localLoadCache(null); + + for (int g = 0; g < gridCount(); g++) { + for (int i = 0; i < cnt; i++) { + String key = String.valueOf(i); + + if (grid(0).affinity(cacheName()).mapKeyToPrimaryAndBackups(key).contains(grid(g).localNode())) + assertEquals((Integer)i, peek(jcache(g), key)); + else + assertNull(peek(jcache(g), key)); + } + } + } + + /** + * @throws Exception In case of error. + */ + public void testRemoveAsync() throws Exception { + IgniteCache<String, Integer> cache = jcache(); + + IgniteCache<String, Integer> cacheAsync = cache.withAsync(); + + cache.put("key1", 1); + cache.put("key2", 2); + + cacheAsync.remove("key1", 0); + + assert !cacheAsync.<Boolean>future().get(); + + assert cache.get("key1") != null && cache.get("key1") == 1; + + cacheAsync.remove("key1", 1); + + assert cacheAsync.<Boolean>future().get(); + + assert cache.get("key1") == null; + + cacheAsync.getAndRemove("key2"); + + assert cacheAsync.<Integer>future().get() == 2; + + assert cache.get("key2") == null; + + cacheAsync.getAndRemove("key2"); + + assert cacheAsync.future().get() == null; + } + + /** + * @throws Exception In case of error. + */ + public void testRemove() throws Exception { + IgniteCache<String, Integer> cache = jcache(); + + cache.put("key1", 1); + + assert cache.remove("key1"); + assert cache.get("key1") == null; + assert !cache.remove("key1"); + } + + /** + * @throws Exception In case of error. + */ + public void testRemovexAsync() throws Exception { + IgniteCache<String, Integer> cache = jcache(); + + IgniteCache<String, Integer> cacheAsync = cache.withAsync(); + + cache.put("key1", 1); + + cacheAsync.remove("key1"); + + assert cacheAsync.<Boolean>future().get(); + + assert cache.get("key1") == null; + + cacheAsync.remove("key1"); + + assert !cacheAsync.<Boolean>future().get(); + } + + /** + * @throws Exception In case of error. + */ + public void testGlobalRemoveAll() throws Exception { + globalRemoveAll(false); + } + + /** + * @throws Exception In case of error. + */ + public void testGlobalRemoveAllAsync() throws Exception { + globalRemoveAll(true); + } + + /** + * @param async If {@code true} uses asynchronous operation. + * @throws Exception In case of error. + */ + private void globalRemoveAll(boolean async) throws Exception { + IgniteCache<String, Integer> cache = jcache(); + + cache.put("key1", 1); + cache.put("key2", 2); + cache.put("key3", 3); + + checkSize(F.asSet("key1", "key2", "key3")); + + atomicClockModeDelay(cache); + + IgniteCache<String, Integer> asyncCache = cache.withAsync(); + + if (async) { + asyncCache.removeAll(F.asSet("key1", "key2")); + + asyncCache.future().get(); + } + else + cache.removeAll(F.asSet("key1", "key2")); + + checkSize(F.asSet("key3")); + + checkContainsKey(false, "key1"); + checkContainsKey(false, "key2"); + checkContainsKey(true, "key3"); + + // Put values again. + cache.put("key1", 1); + cache.put("key2", 2); + cache.put("key3", 3); + + atomicClockModeDelay(cache); + + if (async) { + IgniteCache asyncCache0 = jcache(gridCount() > 1 ? 1 : 0).withAsync(); + + asyncCache0.removeAll(); + + asyncCache0.future().get(); + } + else + jcache(gridCount() > 1 ? 1 : 0).removeAll(); + + assertEquals(0, cache.localSize()); + long entryCnt = hugeRemoveAllEntryCount(); + + for (int i = 0; i < entryCnt; i++) + cache.put(String.valueOf(i), i); + + for (int i = 0; i < entryCnt; i++) + assertEquals(Integer.valueOf(i), cache.get(String.valueOf(i))); + + atomicClockModeDelay(cache); + + if (async) { + asyncCache.removeAll(); + + asyncCache.future().get(); + } + else + cache.removeAll(); + + for (int i = 0; i < entryCnt; i++) + assertNull(cache.get(String.valueOf(i))); + } + + /** + * @return Count of entries to be removed in removeAll() test. + */ + protected long hugeRemoveAllEntryCount() { + return 1000L; + } + + /** + * @throws Exception In case of error. + */ + public void testRemoveAllWithNulls() throws Exception { + final IgniteCache<String, Integer> cache = jcache(); + + final Set<String> c = new LinkedHashSet<>(); + + c.add("key1"); + c.add(null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.removeAll(c); + + return null; + } + }, NullPointerException.class, null); + + assertEquals(0, jcache().localSize()); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.removeAll(null); + + return null; + } + }, NullPointerException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.remove(null); + + return null; + } + }, NullPointerException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.getAndRemove(null); + + return null; + } + }, NullPointerException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.remove("key1", null); + + return null; + } + }, NullPointerException.class, null); + } + + /** + * @throws Exception In case of error. + */ + public void testRemoveAllDuplicates() throws Exception { + jcache().removeAll(ImmutableSet.of("key1", "key1", "key1")); + } + + /** + * @throws Exception In case of error. + */ + public void testRemoveAllDuplicatesTx() throws Exception { + if (txShouldBeUsed()) { + try (Transaction tx = transactions().txStart()) { + jcache().removeAll(ImmutableSet.of("key1", "key1", "key1")); + + tx.commit(); + } + } + } + + /** + * @throws Exception In case of error. + */ + public void testRemoveAllEmpty() throws Exception { + jcache().removeAll(); + } + + /** + * @throws Exception In case of error. + */ + public void testRemoveAllAsync() throws Exception { + IgniteCache<String, Integer> cache = jcache(); + + IgniteCache<String, Integer> cacheAsync = cache.withAsync(); + + cache.put("key1", 1); + cache.put("key2", 2); + cache.put("key3", 3); + + checkSize(F.asSet("key1", "key2", "key3")); + + cacheAsync.removeAll(F.asSet("key1", "key2")); + + assertNull(cacheAsync.future().get()); + + checkSize(F.asSet("key3")); + + checkContainsKey(false, "key1"); + checkContainsKey(false, "key2"); + checkContainsKey(true, "key3"); + } + + /** + * @throws Exception In case of error. + */ + public void testLoadAll() throws Exception { + if (!storeEnabled()) + return; + + IgniteCache<String, Integer> cache = jcache(); + + Set<String> keys = new HashSet<>(primaryKeysForCache(2)); + + for (String key : keys) + assertNull(cache.localPeek(key, ONHEAP)); + + Map<String, Integer> vals = new HashMap<>(); + + int i = 0; + + for (String key : keys) { + cache.put(key, i); + + vals.put(key, i); + + i++; + } + + for (String key : keys) + assertEquals(vals.get(key), peek(cache, key)); + + cache.clear(); + + for (String key : keys) + assertNull(peek(cache, key)); + + loadAll(cache, keys, true); + + for (String key : keys) + assertEquals(vals.get(key), peek(cache, key)); + } + + /** + * @throws Exception If failed. + */ + public void testRemoveAfterClear() throws Exception { + IgniteEx ignite = grid(0); + + boolean affNode = ignite.context().cache().internalCache(cacheName()).context().affinityNode(); + + if (!affNode) { + if (gridCount() < 2) + return; + + ignite = grid(1); + } + + IgniteCache<Integer, Integer> cache = ignite.cache(cacheName()); + + int key = 0; + + Collection<Integer> keys = new ArrayList<>(); + + for (int k = 0; k < 2; k++) { + while (!ignite.affinity(cacheName()).isPrimary(ignite.localNode(), key)) + key++; + + keys.add(key); + + key++; + } + + info("Keys: " + keys); + + for (Integer k : keys) + cache.put(k, k); + + cache.clear(); + + for (int g = 0; g < gridCount(); g++) { + Ignite
<TRUNCATED>
