ignite-5489 Fixed possible connection leaks when loadPreviousValue set to true
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1b2b26a8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1b2b26a8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1b2b26a8 Branch: refs/heads/ignite-5578-locJoin Commit: 1b2b26a82ea286472134a22619952c662b95033f Parents: 7283edb Author: Tikhonov Nikolay <tikhonovnico...@gmail.com> Authored: Wed Jun 21 17:55:05 2017 +0300 Committer: agura <ag...@gridgain.com> Committed: Fri Jul 14 15:43:31 2017 +0300 ---------------------------------------------------------------------- .../store/GridCacheStoreManagerAdapter.java | 7 +- .../cache/CacheConnectionLeakStoreTxTest.java | 291 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite4.java | 2 + 3 files changed, 299 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2b26a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index c02e2c7..99541ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -327,7 +327,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new IgniteCheckedException(new CacheLoaderException(e)); } finally { - sessionEnd0(tx, threwEx); + IgniteInternalTx tx0 = tx; + + if (tx0 != null && (tx0.dht() && tx0.local())) + tx0 = null; + + sessionEnd0(tx0, threwEx); } if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2b26a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConnectionLeakStoreTxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConnectionLeakStoreTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConnectionLeakStoreTxTest.java new file mode 100644 index 0000000..611f2cd --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConnectionLeakStoreTxTest.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.concurrent.ConcurrentHashMap; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.cache.store.CacheStoreSession; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.resources.CacheStoreSessionResource; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.cache.TestCacheSession; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class CacheConnectionLeakStoreTxTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** */ + private static final int CLIENT_NODE = 1; + + /** */ + private static boolean client; + + /** */ + private static volatile boolean isLoadFromStore; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(0); + + client = true; + + startGrid(CLIENT_NODE); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + isLoadFromStore = false; + TestStore.sessions.clear(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionLeakOneBackupAtomic() throws Exception { + checkConnectionLeak(CacheAtomicityMode.ATOMIC, null, null); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionLeakOneBackupAtomicLoadFromStore() throws Exception { + isLoadFromStore = true; + + checkConnectionLeak(CacheAtomicityMode.ATOMIC, null, null); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionLeakOneBackupOptimisticRepeatableRead() throws Exception { + checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionLeakOneBackupOptimisticRepeatableReadLoadFromStore() throws Exception { + isLoadFromStore = true; + + checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionLeakOneBackupOptimisticReadCommitted() throws Exception { + checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, OPTIMISTIC, READ_COMMITTED); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionLeakOneBackupOptimisticReadCommittedLoadFromStore() throws Exception { + isLoadFromStore = true; + + checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, OPTIMISTIC, READ_COMMITTED); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionLeakOneBackupPessimisticRepeatableRead() throws Exception { + checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionLeakOneBackupPessimisticReadCommitted() throws Exception { + checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, PESSIMISTIC, READ_COMMITTED); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionLeakOneBackupPessimisticReadCommittedLoadFromStore() throws Exception { + isLoadFromStore = true; + + checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, PESSIMISTIC, READ_COMMITTED); + } + + /** + * @param atomicityMode Atomicity mode. + * @param txConcurrency Transaction concurrency. + * @param txIsolation Transaction isolation. + * + * @throws Exception If failed. + */ + private void checkConnectionLeak( + CacheAtomicityMode atomicityMode, + TransactionConcurrency txConcurrency, + TransactionIsolation txIsolation + ) throws Exception { + CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(); + + cacheCfg.setName(CACHE_NAME); + cacheCfg.setCacheMode(CacheMode.PARTITIONED); + cacheCfg.setAtomicityMode(atomicityMode); + cacheCfg.setCacheStoreFactory(new TestStoreFactory()); + cacheCfg.setReadThrough(true); + cacheCfg.setWriteThrough(false); + cacheCfg.setLoadPreviousValue(true); + + Ignite ignite = ignite(CLIENT_NODE); + IgniteCache<Integer, Integer> cache = ignite.createCache(cacheCfg); + + try { + assertEquals(0, cache.size()); + + if (atomicityMode == CacheAtomicityMode.TRANSACTIONAL) { + try (Transaction tx = ignite.transactions().txStart(txConcurrency, txIsolation)) { + cacheOp(cache); + + tx.commit(); + } + } + else { + cacheOp(cache); + } + + assertTrue("Session was leak on nodes: " + TestStore.sessions, TestStore.sessions.isEmpty()); + } + finally { + cache.destroy(); + } + } + + /** + * @param cache Cache. + */ + private void cacheOp(IgniteCache<Integer, Integer> cache) { + boolean b = cache.putIfAbsent(42, 42); + + log.info("PutIfAbsent: " + b); + + Integer val = cache.get(42); + + log.info("Get: " + val); + } + + /** + * + */ + private static class TestStoreFactory implements Factory<CacheStoreAdapter<Integer, Integer>> { + /** {@inheritDoc} */ + @Override public CacheStoreAdapter<Integer, Integer> create() { + return new TestStore(); + } + } + + /** + * + */ + private static class TestStore extends CacheStoreAdapter<Integer, Integer> implements Serializable { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @CacheStoreSessionResource + private CacheStoreSession ses; + + /** */ + private CacheStoreSession NULL = new TestCacheSession(); + + /** */ + public static ConcurrentHashMap<CacheStoreSession, ClusterNode> sessions = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override public Integer load(Integer key) throws CacheLoaderException { + addSession(); + + return isLoadFromStore ? key : null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> e) throws CacheWriterException { + addSession(); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + addSession(); + } + + /** */ + private void addSession() { + sessions.put(ses == null ? NULL : ses, ignite.cluster().localNode()); + } + + /** {@inheritDoc} */ + @Override public void sessionEnd(boolean commit) { + sessions.remove(ses == null ? NULL : ses); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2b26a8/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 45f575e..e7f38be 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -21,6 +21,7 @@ import junit.framework.TestSuite; import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListenerSelfTest; import org.apache.ignite.internal.processors.GridCacheTxLoadFromStoreOnLockSelfTest; import org.apache.ignite.internal.processors.cache.CacheClientStoreSelfTest; +import org.apache.ignite.internal.processors.cache.CacheConnectionLeakStoreTxTest; import org.apache.ignite.internal.processors.cache.CacheGetEntryOptimisticReadCommittedSeltTest; import org.apache.ignite.internal.processors.cache.CacheGetEntryOptimisticRepeatableReadSeltTest; import org.apache.ignite.internal.processors.cache.CacheGetEntryOptimisticSerializableSeltTest; @@ -279,6 +280,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(CacheStoreUsageMultinodeStaticStartTxTest.class); suite.addTestSuite(CacheStoreUsageMultinodeDynamicStartAtomicTest.class); suite.addTestSuite(CacheStoreUsageMultinodeDynamicStartTxTest.class); + suite.addTestSuite(CacheConnectionLeakStoreTxTest.class); suite.addTestSuite(GridCacheStoreManagerDeserializationTest.class); suite.addTestSuite(GridLocalCacheStoreManagerDeserializationTest.class);