http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java new file mode 100644 index 0000000..6d90d0e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java @@ -0,0 +1,883 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; + +/** + * + */ +public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String TEST_CACHE = "testCache"; + + /** */ + private boolean client; + + /** */ + private CacheConfiguration ccfg; + + /** */ + private boolean blockRebalance; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setConsistentId(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setMaxMissedClientHeartbeats(1000); + + TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); + + cfg.setCommunicationSpi(commSpi); + + cfg.setClientMode(client); + + if (ccfg != null) + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * + */ + private void blockRebalance() { + for (Ignite node : G.allGrids()) { + testSpi(node).blockMessages(new IgnitePredicate<GridIoMessage>() { + @Override public boolean apply(GridIoMessage msg) { + Object msg0 = msg.message(); + + return (msg0 instanceof GridDhtPartitionSupplyMessage || msg0 instanceof GridDhtPartitionSupplyMessageV2) + && ((GridCacheMessage)msg0).cacheId() == CU.cacheId(TEST_CACHE); + } + }); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllPrimaryFailure1() throws Exception { + putAllPrimaryFailure(true, false); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllPrimaryFailure1_UnstableTopology() throws Exception { + blockRebalance = true; + + putAllPrimaryFailure(true, false); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllPrimaryFailure2() throws Exception { + putAllPrimaryFailure(true, true); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllPrimaryFailure2_UnstableTopology() throws Exception { + blockRebalance = true; + + putAllPrimaryFailure(true, true); + } + + /** + * @param fail0 Fail node 0 flag. + * @param fail1 Fail node 1 flag. + * @throws Exception If failed. + */ + private void putAllPrimaryFailure(boolean fail0, boolean fail1) throws Exception { + ccfg = cacheConfiguration(1, FULL_SYNC); + + startServers(4); + + client = true; + + Ignite client = startGrid(4); + + IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE); + IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync(); + + if (!blockRebalance) + awaitPartitionMapExchange(); + + Ignite srv0 = ignite(0); + Ignite srv1 = ignite(1); + + Integer key1 = primaryKey(srv0.cache(TEST_CACHE)); + Integer key2 = primaryKey(srv1.cache(TEST_CACHE)); + + Map<Integer, Integer> map = new HashMap<>(); + map.put(key1, key1); + map.put(key2, key2); + + assertEquals(2, map.size()); + + if (fail0) { + testSpi(client).blockMessages(GridNearAtomicFullUpdateRequest.class, srv0.name()); + testSpi(client).blockMessages(GridNearAtomicCheckUpdateRequest.class, srv0.name()); + } + + if (fail1) { + testSpi(client).blockMessages(GridNearAtomicFullUpdateRequest.class, srv1.name()); + testSpi(client).blockMessages(GridNearAtomicCheckUpdateRequest.class, srv1.name()); + } + + log.info("Start put [key1=" + key1 + ", key2=" + key2 + ']'); + + nearAsyncCache.putAll(map); + + IgniteFuture<?> fut = nearAsyncCache.future(); + + U.sleep(500); + + assertFalse(fut.isDone()); + + if (fail0) + stopGrid(0); + + if (fail1) + stopGrid(1); + + fut.get(); + + checkData(map); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllBackupFailure1() throws Exception { + putAllBackupFailure1(); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllBackupFailure1_UnstableTopology() throws Exception { + blockRebalance = true; + + putAllBackupFailure1(); + } + + /** + * @throws Exception If failed. + */ + private void putAllBackupFailure1() throws Exception { + ccfg = cacheConfiguration(1, FULL_SYNC); + + startServers(4); + + client = true; + + Ignite client = startGrid(4); + + IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE); + IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync(); + + if (!blockRebalance) + awaitPartitionMapExchange(); + + Ignite srv0 = ignite(0); + + List<Integer> keys = primaryKeys(srv0.cache(TEST_CACHE), 3); + + Ignite backup = backup(client.affinity(TEST_CACHE), keys.get(0)); + + testSpi(backup).blockMessages(GridDhtAtomicNearResponse.class, client.name()); + + Map<Integer, Integer> map = new HashMap<>(); + + for (Integer key : keys) + map.put(key, key); + + log.info("Start put [map=" + map + ']'); + + nearAsyncCache.putAll(map); + + IgniteFuture<?> fut = nearAsyncCache.future(); + + U.sleep(500); + + assertFalse(fut.isDone()); + + stopGrid(backup.name()); + + fut.get(); + + checkData(map); + } + + /** + * @throws Exception If failed. + */ + public void testPutBackupFailure1() throws Exception { + putBackupFailure1(); + } + + /** + * @throws Exception If failed. + */ + public void testPutBackupFailure1_UnstableTopology() throws Exception { + blockRebalance = true; + + putBackupFailure1(); + } + + /** + * @throws Exception If failed. + */ + private void putBackupFailure1() throws Exception { + ccfg = cacheConfiguration(1, FULL_SYNC); + + startServers(4); + + client = true; + + Ignite client = startGrid(4); + + IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE); + IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync(); + + if (!blockRebalance) + awaitPartitionMapExchange(); + + Ignite srv0 = ignite(0); + + Integer key = primaryKey(srv0.cache(TEST_CACHE)); + + Ignite backup = backup(client.affinity(TEST_CACHE), key); + + testSpi(backup).blockMessages(GridDhtAtomicNearResponse.class, client.name()); + + log.info("Start put [key=" + key + ']'); + + nearAsyncCache.put(key, key); + + IgniteFuture<?> fut = nearAsyncCache.future(); + + U.sleep(500); + + assertFalse(fut.isDone()); + + stopGrid(backup.name()); + + fut.get(); + + checkData(F.asMap(key, key)); + } + + /** + * @throws Exception If failed. + */ + public void testFullAsyncPutRemap() throws Exception { + fullAsyncRemap(false); + } + + /** + * @throws Exception If failed. + */ + public void testFullAsyncPutAllRemap() throws Exception { + fullAsyncRemap(true); + } + + /** + * @param putAll Test putAll flag. + * @throws Exception If failed. + */ + private void fullAsyncRemap(boolean putAll) throws Exception { + Ignite srv0 = startGrid(0); + + client = true; + + Ignite clientNode = startGrid(1); + + client = false; + + final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1, FULL_ASYNC)); + + List<Integer> keys = primaryKeys(srv0.cache(TEST_CACHE), putAll ? 3 : 1); + + testSpi(clientNode).blockMessages(GridNearAtomicSingleUpdateRequest.class, srv0.name()); + testSpi(clientNode).blockMessages(GridNearAtomicFullUpdateRequest.class, srv0.name()); + + final Map<Integer, Integer> map = new HashMap<>(); + + for (Integer key : keys) + map.put(key, -key); + + if (putAll) + nearCache.putAll(map); + else + nearCache.put(keys.get(0), map.get(keys.get(0))); + + int nodeIdx = 2; + + Affinity<Object> aff = clientNode.affinity(TEST_CACHE); + + int keysMoved; + + do { + startGrid(nodeIdx); + + awaitPartitionMapExchange(); + + keysMoved = 0; + + for (Integer key : keys) { + if (!aff.isPrimary(srv0.cluster().localNode(), key)) + keysMoved++; + } + + if (keysMoved == keys.size()) + break; + + nodeIdx++; + } + while (nodeIdx < 10); + + assertEquals(keys.size(), keysMoved); + + testSpi(clientNode).stopBlock(true); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (Integer key : map.keySet()) { + if (nearCache.get(key) == null) + return false; + } + + return true; + } + }, 5000); + + checkData(map); + } + + /** + * @throws Exception If failed. + */ + public void testPutPrimarySync() throws Exception { + startGrids(2); + + client = true; + + Ignite clientNode = startGrid(2); + + client = false; + + final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1, PRIMARY_SYNC)); + IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync(); + + awaitPartitionMapExchange(); + + Ignite srv0 = grid(0); + final Ignite srv1 = grid(1); + + final Integer key = primaryKey(srv0.cache(TEST_CACHE)); + + testSpi(srv0).blockMessages(GridDhtAtomicSingleUpdateRequest.class, srv1.name()); + + nearAsyncCache.put(key, key); + + IgniteFuture<?> fut = nearAsyncCache.future(); + + fut.get(5, TimeUnit.SECONDS); + + assertEquals(key, srv0.cache(TEST_CACHE).get(key)); + + assertNull(srv1.cache(TEST_CACHE).localPeek(key)); + + testSpi(srv0).stopBlock(true); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return srv1.cache(TEST_CACHE).localPeek(key) != null; + } + }, 5000); + + checkData(F.asMap(key, key)); + } + + /** + * @throws Exception If failed. + */ + public void testPutNearNodeFailure() throws Exception { + startGrids(2); + + client = true; + + Ignite clientNode = startGrid(2); + + final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1, FULL_SYNC)); + IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync(); + + awaitPartitionMapExchange(); + + final Ignite srv0 = grid(0); + final Ignite srv1 = grid(1); + + final Integer key = primaryKey(srv0.cache(TEST_CACHE)); + + nearAsyncCache.put(key, key); + + testSpi(srv1).blockMessages(GridDhtAtomicNearResponse.class, clientNode.name()); + + stopGrid(2); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return ((IgniteKernal)srv0).context().cache().context().mvcc().atomicFuturesCount() == 0; + } + }, 5000); + + assertEquals(0, ((IgniteKernal)srv0).context().cache().context().mvcc().atomicFuturesCount()); + assertEquals(0, ((IgniteKernal)srv1).context().cache().context().mvcc().atomicFuturesCount()); + + checkData(F.asMap(key, key)); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllNearNodeFailure() throws Exception { + final int SRVS = 4; + + startGrids(SRVS); + + client = true; + + Ignite clientNode = startGrid(SRVS); + + final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1, FULL_SYNC)); + IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync(); + + awaitPartitionMapExchange(); + + for (int i = 0; i < SRVS; i++) + testSpi(grid(i)).blockMessages(GridDhtAtomicNearResponse.class, clientNode.name()); + + final Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, i); + + nearAsyncCache.putAll(map); + + boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + IgniteCache cache = ignite(0).cache(TEST_CACHE); + + for (Integer key : map.keySet()) { + if (cache.get(key) == null) + return false; + } + + return true; + } + }, 5000); + + assertTrue(wait); + + stopGrid(SRVS); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (int i = 0; i < SRVS; i++) { + if (grid(i).context().cache().context().mvcc().atomicFuturesCount() != 0) + return false; + } + + return true; + } + }, 5000); + + for (int i = 0; i < SRVS; i++) + assertEquals(0, grid(i).context().cache().context().mvcc().atomicFuturesCount()); + + checkData(map); + } + + /** + * @throws Exception If failed. + */ + public void testCacheOperations0() throws Exception { + cacheOperations(0); + } + + /** + * @throws Exception If failed. + */ + public void testCacheOperations_UnstableTopology0() throws Exception { + blockRebalance = true; + + cacheOperations(0); + } + + /** + * @throws Exception If failed. + */ + public void testCacheOperations1() throws Exception { + cacheOperations(1); + } + + /** + * @throws Exception If failed. + */ + public void testCacheOperations_UnstableTopology1() throws Exception { + blockRebalance = true; + + cacheOperations(1); + } + + /** + * @throws Exception If failed. + */ + public void testCacheOperations2() throws Exception { + cacheOperations(2); + } + + /** + * @throws Exception If failed. + */ + public void testCacheOperations_UnstableTopology2() throws Exception { + blockRebalance = true; + + cacheOperations(2); + } + + /** + * @param backups Number of backups. + * @throws Exception If failed. + */ + private void cacheOperations(int backups) throws Exception { + ccfg = cacheConfiguration(backups, FULL_SYNC); + + final int SRVS = 4; + + startServers(SRVS); + + client = true; + + Ignite clientNode = startGrid(SRVS); + + final IgniteCache<Integer, Integer> nearCache = clientNode.cache(TEST_CACHE); + + Integer key = primaryKey(ignite(0).cache(TEST_CACHE)); + + nearCache.replace(key, 1); + + nearCache.remove(key); + + nearCache.invoke(key, new SetValueEntryProcessor(null)); + + Map<Integer, SetValueEntryProcessor> map = new HashMap<>(); + + List<Integer> keys = primaryKeys(ignite(0).cache(TEST_CACHE), 2); + + map.put(keys.get(0), new SetValueEntryProcessor(1)); + map.put(keys.get(1), new SetValueEntryProcessor(null)); + + nearCache.invokeAll(map); + + Set<Integer> rmvAllKeys = new HashSet<>(); + + for (int i = 0; i < 100; i++) { + nearCache.put(i, i); + + if (i % 2 == 0) + rmvAllKeys.add(i); + } + + nearCache.removeAll(rmvAllKeys); + } + + /** + * @throws Exception If failed. + */ + public void testPutMissedDhtRequest_UnstableTopology() throws Exception { + blockRebalance = true; + + ccfg = cacheConfiguration(1, FULL_SYNC); + + startServers(4); + + client = true; + + Ignite client = startGrid(4); + + IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE); + IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync(); + + testSpi(ignite(0)).blockMessages(new IgnitePredicate<GridIoMessage>() { + @Override public boolean apply(GridIoMessage msg) { + return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest; + } + }); + + Integer key = primaryKey(ignite(0).cache(TEST_CACHE)); + + log.info("Start put [key=" + key + ']'); + + nearAsyncCache.put(key, key); + IgniteFuture<?> fut = nearAsyncCache.future(); + + U.sleep(500); + + assertFalse(fut.isDone()); + + stopGrid(0); + + fut.get(); + + checkData(F.asMap(key, key)); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllMissedDhtRequest_UnstableTopology1() throws Exception { + putAllMissedDhtRequest_UnstableTopology(true, false); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllMissedDhtRequest_UnstableTopology2() throws Exception { + putAllMissedDhtRequest_UnstableTopology(true, true); + } + + /** + * @param fail0 Fail node 0 flag. + * @param fail1 Fail node 1 flag. + * @throws Exception If failed. + */ + private void putAllMissedDhtRequest_UnstableTopology(boolean fail0, boolean fail1) throws Exception { + blockRebalance = true; + + ccfg = cacheConfiguration(1, FULL_SYNC); + + startServers(4); + + client = true; + + Ignite client = startGrid(4); + + IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE); + IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync(); + + if (fail0) { + testSpi(ignite(0)).blockMessages(new IgnitePredicate<GridIoMessage>() { + @Override public boolean apply(GridIoMessage msg) { + return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest; + } + }); + } + if (fail1) { + testSpi(ignite(2)).blockMessages(new IgnitePredicate<GridIoMessage>() { + @Override public boolean apply(GridIoMessage msg) { + return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest; + } + }); + } + + Integer key1 = primaryKey(ignite(0).cache(TEST_CACHE)); + Integer key2 = primaryKey(ignite(2).cache(TEST_CACHE)); + + log.info("Start put [key1=" + key1 + ", key2=" + key1 + ']'); + + Map<Integer, Integer> map = new HashMap<>(); + map.put(key1, 10); + map.put(key2, 20); + + nearAsyncCache.putAll(map); + IgniteFuture<?> fut = nearAsyncCache.future(); + + U.sleep(500); + + assertFalse(fut.isDone()); + + if (fail0) + stopGrid(0); + if (fail1) + stopGrid(2); + + fut.get(); + + checkData(map); + } + + /** + * @param expData Expected cache data. + */ + private void checkData(Map<Integer, Integer> expData) { + assert !expData.isEmpty(); + + List<Ignite> nodes = G.allGrids(); + + assertFalse(nodes.isEmpty()); + + for (Ignite node : nodes) { + IgniteCache<Integer, Integer> cache = node.cache(TEST_CACHE); + + for (Map.Entry<Integer, Integer> e : expData.entrySet()) { + assertEquals("Invalid value [key=" + e.getKey() + ", node=" + node.name() + ']', + e.getValue(), + cache.get(e.getKey())); + } + } + } + + /** + * @param aff Affinity. + * @param key Key. + * @return Backup node for given key. + */ + private Ignite backup(Affinity<Object> aff, Object key) { + for (Ignite ignite : G.allGrids()) { + ClusterNode node = ignite.cluster().localNode(); + + if (aff.isPrimaryOrBackup(node, key) && !aff.isPrimary(node, key)) + return ignite; + } + + fail("Failed to find backup for key: " + key); + + return null; + } + + /** + * @param node Node. + * @return Node communication SPI. + */ + private TestRecordingCommunicationSpi testSpi(Ignite node) { + return (TestRecordingCommunicationSpi)node.configuration().getCommunicationSpi(); + } + + /** + * @param backups Number of backups. + * @param writeSync Cache write synchronization mode. + * @return Cache configuration. + */ + private CacheConfiguration<Integer, Integer> cacheConfiguration(int backups, + CacheWriteSynchronizationMode writeSync) { + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + + ccfg.setName(TEST_CACHE); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(writeSync); + ccfg.setBackups(backups); + ccfg.setRebalanceMode(ASYNC); + + return ccfg; + } + + /** + * @param cnt Number of server nodes. + * @throws Exception If failed. + */ + private void startServers(int cnt) throws Exception { + startGrids(cnt - 1); + + awaitPartitionMapExchange(); + + if (blockRebalance) + blockRebalance(); + + startGrid(cnt - 1); + } + + /** + * + */ + public static class SetValueEntryProcessor implements CacheEntryProcessor<Integer, Integer, Object> { + /** */ + private Integer val; + + /** + * @param val Value. + */ + SetValueEntryProcessor(Integer val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public Object process(MutableEntry<Integer, Integer> entry, Object... args) { + if (val != null) + entry.setValue(val); + + return null; + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java deleted file mode 100644 index 9057507..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.near; - -import org.apache.ignite.configuration.IgniteConfiguration; - -/** - * - */ -public class GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest extends - GridCacheAtomicPrimaryWriteOrderMultiNodeFullApiSelfTest { - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - - cfg.setStripedPoolSize(-1); - - return cfg; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java index 7646741..9505b24 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java @@ -40,11 +40,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; @@ -147,7 +149,7 @@ public class GridCacheNearReadersSelfTest extends GridCommonAbstractTest { startGrids(); ClusterNode n1 = F.first(aff.nodes(aff.partition(1), grid(0).cluster().nodes())); - ClusterNode n2 = F.first(aff.nodes(aff.partition(2), grid(0).cluster().nodes())); + final ClusterNode n2 = F.first(aff.nodes(aff.partition(2), grid(0).cluster().nodes())); assertNotNull(n1); assertNotNull(n2); @@ -164,7 +166,7 @@ public class GridCacheNearReadersSelfTest extends GridCommonAbstractTest { assertNull(cache1.getAndPut(1, "v1")); assertNull(cache1.getAndPut(2, "v2")); - GridDhtCacheEntry e1 = (GridDhtCacheEntry)dht(cache1).entryEx(1); + final GridDhtCacheEntry e1 = (GridDhtCacheEntry)dht(cache1).entryEx(1); GridDhtCacheEntry e2 = (GridDhtCacheEntry)dht(cache2).entryEx(2); assertNotNull(e1.readers()); @@ -207,6 +209,17 @@ public class GridCacheNearReadersSelfTest extends GridCommonAbstractTest { assertNotNull(cache1.getAndPut(1, "z1")); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + return !e1.readers().contains(n2.id()); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + }, 5000); + // Node 1 still has node2 in readers map. assertFalse(e1.readers().contains(n2.id())); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java deleted file mode 100644 index 7ba3144..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.near; - -import org.apache.ignite.configuration.IgniteConfiguration; - -/** - * - */ -public class GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest extends - GridCachePartitionedMultiNodeFullApiSelfTest { - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - - cfg.setStripedPoolSize(-1); - - return cfg; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java index 958bb5c..39e995a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java @@ -771,7 +771,7 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr /** {@inheritDoc} */ @Override protected long getTestTimeout() { - return TimeUnit.SECONDS.toMillis(15); + return TimeUnit.SECONDS.toMillis(2 * 60); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index cb8e755..716bb0d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -258,15 +258,15 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC qryClnCache.put(key, -1); qryClnCache.put(keys.get(0), 100); - } - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return lsnr.evts.size() == 1; - } - }, 5000); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return lsnr.evts.size() == 1; + } + }, 5000); - assertEquals(lsnr.evts.size(), 1); + assertEquals(lsnr.evts.size(), 1); + } } /** @@ -480,7 +480,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC if (rnd.nextBoolean()) cache = qryClient.cache(null); else { - for (int j = 0; j < 10; j++) { + for (int j = 0; j < 1000; j++) { int nodeIdx = rnd.nextInt(SRV_NODES); if (killedNode != nodeIdx) { @@ -1150,11 +1150,10 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC boolean lostAllow, boolean wait) throws Exception { if (wait) { GridTestUtils.waitForCondition(new PA() { - @Override - public boolean apply() { + @Override public boolean apply() { return expEvts.size() == lsnr.size(); } - }, 2000L); + }, 10_000L); } synchronized (lsnr) { @@ -1970,9 +1969,9 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { - return lsnr.size() <= size; + return lsnr.size() >= size; } - }, 2000L); + }, 10_000L); List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>(); @@ -2300,16 +2299,16 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC log.info("Batch loaded. Iteration: " + iteration); - final long cnt = lsnr.count(); - final long expCnt = putCnt * stableNodeCnt + ignoredDupEvts; GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - return cnt == expCnt; + return lsnr.count() == expCnt; } }, 6_000); + final long cnt = lsnr.count(); + if (cnt != expCnt) { StringBuilder sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java index ecfb4e8..efac24a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java @@ -132,7 +132,7 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry( 1, EventType.UPDATED, - new KeyCacheObjectImpl(1, new byte[] {0, 0, 0, 1}), + new KeyCacheObjectImpl(1, new byte[] {0, 0, 0, 1}, 1), new CacheObjectImpl(2, new byte[] {0, 0, 0, 2}), new CacheObjectImpl(2, new byte[] {0, 0, 0, 3}), true, http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioEmbeddedFutureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioEmbeddedFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioEmbeddedFutureSelfTest.java index 5591f2c..b7b6966 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioEmbeddedFutureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioEmbeddedFutureSelfTest.java @@ -32,7 +32,7 @@ public class GridNioEmbeddedFutureSelfTest extends GridCommonAbstractTest { */ public void testNioEmbeddedFuture() throws Exception { // Original future. - final GridNioFutureImpl<Integer> origFut = new GridNioFutureImpl<>(); + final GridNioFutureImpl<Integer> origFut = new GridNioFutureImpl<>(null); // Embedded future to test. final GridNioEmbeddedFuture<Integer> embFut = new GridNioEmbeddedFuture<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java index 684ae01..44a1eff 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java @@ -35,24 +35,23 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; * Test for NIO future. */ public class GridNioFutureSelfTest extends GridCommonAbstractTest { - /** * @throws Exception If failed. */ public void testOnDone() throws Exception { - GridNioFutureImpl<String> fut = new GridNioFutureImpl<>(); + GridNioFutureImpl<String> fut = new GridNioFutureImpl<>(null); fut.onDone(); assertNull(fut.get()); - fut = new GridNioFutureImpl<>(); + fut = new GridNioFutureImpl<>(null); fut.onDone("test"); assertEquals("test", fut.get()); - fut = new GridNioFutureImpl<>(); + fut = new GridNioFutureImpl<>(null); fut.onDone(new IgniteCheckedException("TestMessage")); @@ -64,7 +63,7 @@ public class GridNioFutureSelfTest extends GridCommonAbstractTest { } }, IgniteCheckedException.class, "TestMessage"); - fut = new GridNioFutureImpl<>(); + fut = new GridNioFutureImpl<>(null); fut.onDone("test", new IgniteCheckedException("TestMessage")); @@ -76,7 +75,7 @@ public class GridNioFutureSelfTest extends GridCommonAbstractTest { } }, IgniteCheckedException.class, "TestMessage"); - fut = new GridNioFutureImpl<>(); + fut = new GridNioFutureImpl<>(null); fut.onDone("test"); @@ -86,12 +85,12 @@ public class GridNioFutureSelfTest extends GridCommonAbstractTest { } /** - * @throws Exception + * @throws Exception If failed. */ public void testOnCancelled() throws Exception { GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { - GridNioFutureImpl<String> fut = new GridNioFutureImpl<>(); + GridNioFutureImpl<String> fut = new GridNioFutureImpl<>(null); fut.onCancelled(); @@ -101,7 +100,7 @@ public class GridNioFutureSelfTest extends GridCommonAbstractTest { GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { - GridNioFutureImpl<String> fut = new GridNioFutureImpl<>(); + GridNioFutureImpl<String> fut = new GridNioFutureImpl<>(null); fut.onCancelled(); @@ -116,7 +115,7 @@ public class GridNioFutureSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testListenSyncNotify() throws Exception { - GridNioFutureImpl<String> fut = new GridNioFutureImpl<>(); + GridNioFutureImpl<String> fut = new GridNioFutureImpl<>(null); int lsnrCnt = 10; @@ -167,9 +166,9 @@ public class GridNioFutureSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testGet() throws Exception { - GridNioFutureImpl<Object> unfinished = new GridNioFutureImpl<>(); - GridNioFutureImpl<Object> finished = new GridNioFutureImpl<>(); - GridNioFutureImpl<Object> cancelled = new GridNioFutureImpl<>(); + GridNioFutureImpl<Object> unfinished = new GridNioFutureImpl<>(null); + GridNioFutureImpl<Object> finished = new GridNioFutureImpl<>(null); + GridNioFutureImpl<Object> cancelled = new GridNioFutureImpl<>(null); finished.onDone("Finished"); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java index d403784..e6aab9f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java @@ -21,6 +21,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.nio.GridNioFilterAdapter; import org.apache.ignite.internal.util.nio.GridNioFilterChain; @@ -30,6 +31,7 @@ import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor; import org.apache.ignite.internal.util.nio.GridNioServerListener; import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter; import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; @@ -114,7 +116,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest { proceedExceptionCaught(ses, ex); } - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) { + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut, IgniteInClosure<IgniteException> ackC) { sndEvt.compareAndSet(null, ses.<String>meta(MESSAGE_WRITE_META_NAME)); sndMsgObj.compareAndSet(null, msg); @@ -155,7 +157,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest { chain.onSessionIdleTimeout(ses); chain.onSessionWriteTimeout(ses); assertNull(chain.onSessionClose(ses)); - assertNull(chain.onSessionWrite(ses, snd, true)); + assertNull(chain.onSessionWrite(ses, snd, true, null)); assertEquals("DCBA", connectedEvt.get()); assertEquals("DCBA", disconnectedEvt.get()); @@ -210,10 +212,10 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException { + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { chainMeta(ses, MESSAGE_WRITE_META_NAME); - return proceedSessionWrite(ses, msg, fut); + return proceedSessionWrite(ses, msg, fut, ackC); } /** {@inheritDoc} */ @@ -349,7 +351,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public void sendNoFuture(Object msg) { + @Override public void sendNoFuture(Object msg, IgniteInClosure<IgniteException> ackC) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java index ab21165..a59b6d1 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java @@ -118,7 +118,7 @@ public class GridFileSwapSpaceSpiSelfTest extends GridSwapSpaceSpiAbstractSelfTe * @return Swap key. */ private SwapKey key(int i) { - return new SwapKey(new KeyCacheObjectImpl(i, U.intToBytes(i)), i % 11, U.intToBytes(i)); + return new SwapKey(new KeyCacheObjectImpl(i, U.intToBytes(i), i), i % 11, U.intToBytes(i)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java index 17757ab..96a8a33 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePart import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledOffHeapTieredFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledOffHeapTieredMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.CachePartitionedMultiNodeLongTxTimeoutFullApiTest; +import org.apache.ignite.internal.processors.cache.distributed.near.CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicClientOnlyFairAffinityMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicClientOnlyMultiNodeP2PDisabledFullApiSelfTest; @@ -57,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAto import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderFairAffinityMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderMultiNodeP2PDisabledFullApiSelfTest; -import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderOffHeapFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderOffHeapTieredFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWrityOrderOffHeapMultiNodeFullApiSelfTest; @@ -76,9 +76,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePar import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedMultiNodeCounterSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedMultiNodeP2PDisabledFullApiSelfTest; -import org.apache.ignite.internal.processors.cache.distributed.near.CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedNearOnlyNoPrimaryFullApiSelfTest; -import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedOffHeapFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedOffHeapMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedOffHeapTieredFullApiSelfTest; @@ -230,10 +228,6 @@ public class IgniteCacheFullApiSelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheReplicatedFullApiMultithreadedSelfTest.class); suite.addTestSuite(GridCachePartitionedFullApiMultithreadedSelfTest.class); - // Disabled striped pool. - suite.addTestSuite(GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.class); - suite.addTestSuite(GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.class); - // Other. suite.addTestSuite(GridCacheClearSelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java index 03204e2..6fc6846 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinity import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentNodeJoinValidationTest; import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxIteratorSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.IgniteCacheAtomicProtocolTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheSyncRebalanceModeSelfTest; import org.apache.ignite.internal.processors.cache.store.IgniteCacheWriteBehindNoUpdateSelfTest; @@ -66,6 +67,8 @@ public class IgniteCacheTestSuite5 extends TestSuite { suite.addTestSuite(CacheRebalancingSelfTest.class); + suite.addTestSuite(IgniteCacheAtomicProtocolTest.class); + suite.addTestSuite(PartitionsExchangeOnDiscoveryHistoryOverflowTest.class); return suite; http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java index 9204c97..8a20eec 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java @@ -34,6 +34,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; @@ -63,6 +64,7 @@ import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; @@ -1306,11 +1308,14 @@ public class HadoopExternalCommunication { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException { + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, + Object msg, + boolean fut, + IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { if (ses.meta(PROCESS_META) == null && !(msg instanceof ProcessHandshakeMessage)) log.warning("Writing message before handshake has finished [ses=" + ses + ", msg=" + msg + ']'); - return proceedSessionWrite(ses, msg, fut); + return proceedSessionWrite(ses, msg, fut, ackC); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java index 3f33fb7..08981af 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java @@ -24,6 +24,7 @@ import java.nio.ByteOrder; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.util.ipc.IpcEndpoint; import org.apache.ignite.internal.util.nio.GridNioFilter; @@ -34,6 +35,7 @@ import org.apache.ignite.internal.util.nio.GridNioFuture; import org.apache.ignite.internal.util.nio.GridNioServerListener; import org.apache.ignite.internal.util.nio.GridNioSession; import org.apache.ignite.internal.util.nio.GridNioSessionImpl; +import org.apache.ignite.lang.IgniteInClosure; /** * Allows to re-use existing {@link GridNioFilter}s on IPC (specifically shared memory IPC) @@ -190,7 +192,10 @@ public class HadoopIpcToNioAdapter<T> { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) { + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, + Object msg, + boolean fut, + IgniteInClosure<IgniteException> ackC) { assert ses == HadoopIpcToNioAdapter.this.ses : "ses=" + ses + ", this.ses=" + HadoopIpcToNioAdapter.this.ses; http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java index 24bba88..d90a900 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java @@ -18,11 +18,13 @@ package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; import org.apache.ignite.internal.util.nio.GridNioFilterAdapter; import org.apache.ignite.internal.util.nio.GridNioFuture; import org.apache.ignite.internal.util.nio.GridNioSession; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.marshaller.Marshaller; /** @@ -57,12 +59,16 @@ public class HadoopMarshallerFilter extends GridNioFilterAdapter { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException { + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, + Object msg, + boolean fut, + IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { assert msg instanceof HadoopMessage : "Invalid message type: " + msg; - return proceedSessionWrite(ses, U.marshal(marsh, msg), fut); + return proceedSessionWrite(ses, U.marshal(marsh, msg), fut, ackC); } + /** {@inheritDoc} */ @Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException { assert msg instanceof byte[]; http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java index 40e563c..8d15e5e 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java @@ -22,7 +22,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.yardstick.cache.model.SampleValue; /** - * Ignite benchmark that performs invoke operations. + * Ignite benchmark that performs getAndPut operations. */ public class IgniteGetAndPutBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> { /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java index 49ae985..0a3794c 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java @@ -27,7 +27,7 @@ import org.apache.ignite.yardstick.cache.model.SampleValue; import org.yardstickframework.BenchmarkConfiguration; /** - * Ignite benchmark that performs invoke operations. + * Ignite benchmark that performs getAndPut operations. */ public class IgniteGetAndPutTxBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> { /** */
