ignite-5489 Fixed possible connection leaks when loadPreviousValue set to true


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1b2b26a8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1b2b26a8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1b2b26a8

Branch: refs/heads/ignite-5578
Commit: 1b2b26a82ea286472134a22619952c662b95033f
Parents: 7283edb
Author: Tikhonov Nikolay <tikhonovnico...@gmail.com>
Authored: Wed Jun 21 17:55:05 2017 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri Jul 14 15:43:31 2017 +0300

----------------------------------------------------------------------
 .../store/GridCacheStoreManagerAdapter.java     |   7 +-
 .../cache/CacheConnectionLeakStoreTxTest.java   | 291 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 3 files changed, 299 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2b26a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index c02e2c7..99541ba 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -327,7 +327,12 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
                 throw new IgniteCheckedException(new CacheLoaderException(e));
             }
             finally {
-                sessionEnd0(tx, threwEx);
+                IgniteInternalTx tx0 = tx;
+
+                if (tx0 != null && (tx0.dht() && tx0.local()))
+                    tx0 = null;
+
+                sessionEnd0(tx0, threwEx);
             }
 
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2b26a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConnectionLeakStoreTxTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConnectionLeakStoreTxTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConnectionLeakStoreTxTest.java
new file mode 100644
index 0000000..611f2cd
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConnectionLeakStoreTxTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.cache.store.CacheStoreSession;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.cache.TestCacheSession;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class CacheConnectionLeakStoreTxTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** */
+    private static final int CLIENT_NODE = 1;
+
+    /** */
+    private static boolean client;
+
+    /** */
+    private static volatile boolean isLoadFromStore;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrid(0);
+
+        client = true;
+
+        startGrid(CLIENT_NODE);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        isLoadFromStore = false;
+        TestStore.sessions.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionLeakOneBackupAtomic() throws Exception {
+        checkConnectionLeak(CacheAtomicityMode.ATOMIC, null, null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionLeakOneBackupAtomicLoadFromStore() throws 
Exception {
+        isLoadFromStore = true;
+
+        checkConnectionLeak(CacheAtomicityMode.ATOMIC, null, null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionLeakOneBackupOptimisticRepeatableRead() throws 
Exception {
+        checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, OPTIMISTIC, 
REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void 
testConnectionLeakOneBackupOptimisticRepeatableReadLoadFromStore() throws 
Exception {
+        isLoadFromStore = true;
+
+        checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, OPTIMISTIC, 
REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionLeakOneBackupOptimisticReadCommitted() throws 
Exception {
+        checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, OPTIMISTIC, 
READ_COMMITTED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void 
testConnectionLeakOneBackupOptimisticReadCommittedLoadFromStore() throws 
Exception {
+        isLoadFromStore = true;
+
+        checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, OPTIMISTIC, 
READ_COMMITTED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionLeakOneBackupPessimisticRepeatableRead() throws 
Exception {
+        checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, PESSIMISTIC, 
REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionLeakOneBackupPessimisticReadCommitted() throws 
Exception {
+        checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, PESSIMISTIC, 
READ_COMMITTED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void 
testConnectionLeakOneBackupPessimisticReadCommittedLoadFromStore() throws 
Exception {
+        isLoadFromStore = true;
+
+        checkConnectionLeak(CacheAtomicityMode.TRANSACTIONAL, PESSIMISTIC, 
READ_COMMITTED);
+    }
+
+    /**
+     * @param atomicityMode Atomicity mode.
+     * @param txConcurrency Transaction concurrency.
+     * @param txIsolation Transaction isolation.
+     *
+     * @throws Exception If failed.
+     */
+    private void checkConnectionLeak(
+            CacheAtomicityMode atomicityMode,
+            TransactionConcurrency txConcurrency,
+            TransactionIsolation txIsolation
+    ) throws Exception {
+        CacheConfiguration<Integer, Integer> cacheCfg = new 
CacheConfiguration<>();
+
+        cacheCfg.setName(CACHE_NAME);
+        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+        cacheCfg.setAtomicityMode(atomicityMode);
+        cacheCfg.setCacheStoreFactory(new TestStoreFactory());
+        cacheCfg.setReadThrough(true);
+        cacheCfg.setWriteThrough(false);
+        cacheCfg.setLoadPreviousValue(true);
+
+        Ignite ignite = ignite(CLIENT_NODE);
+        IgniteCache<Integer, Integer> cache = ignite.createCache(cacheCfg);
+
+        try {
+            assertEquals(0, cache.size());
+
+            if (atomicityMode == CacheAtomicityMode.TRANSACTIONAL) {
+                try (Transaction tx = 
ignite.transactions().txStart(txConcurrency, txIsolation)) {
+                    cacheOp(cache);
+
+                    tx.commit();
+                }
+            }
+            else {
+                cacheOp(cache);
+            }
+
+            assertTrue("Session was leak on nodes: " + TestStore.sessions, 
TestStore.sessions.isEmpty());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     */
+    private void cacheOp(IgniteCache<Integer, Integer> cache) {
+        boolean b = cache.putIfAbsent(42, 42);
+
+        log.info("PutIfAbsent: " + b);
+
+        Integer val = cache.get(42);
+
+        log.info("Get: " + val);
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements 
Factory<CacheStoreAdapter<Integer, Integer>> {
+        /** {@inheritDoc} */
+        @Override public CacheStoreAdapter<Integer, Integer> create() {
+            return new TestStore();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestStore extends CacheStoreAdapter<Integer, Integer> 
implements Serializable {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        @CacheStoreSessionResource
+        private CacheStoreSession ses;
+
+        /** */
+        private CacheStoreSession NULL = new TestCacheSession();
+
+        /** */
+        public static ConcurrentHashMap<CacheStoreSession, ClusterNode> 
sessions = new ConcurrentHashMap<>();
+
+        /** {@inheritDoc} */
+        @Override public Integer load(Integer key) throws CacheLoaderException 
{
+            addSession();
+
+            return isLoadFromStore ? key : null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<? extends Integer, ? extends 
Integer> e) throws CacheWriterException {
+            addSession();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            addSession();
+        }
+
+        /**  */
+        private void addSession() {
+            sessions.put(ses == null ? NULL : ses, 
ignite.cluster().localNode());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sessionEnd(boolean commit) {
+            sessions.remove(ses == null ? NULL : ses);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2b26a8/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 45f575e..e7f38be 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -21,6 +21,7 @@ import junit.framework.TestSuite;
 import 
org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListenerSelfTest;
 import 
org.apache.ignite.internal.processors.GridCacheTxLoadFromStoreOnLockSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheClientStoreSelfTest;
+import 
org.apache.ignite.internal.processors.cache.CacheConnectionLeakStoreTxTest;
 import 
org.apache.ignite.internal.processors.cache.CacheGetEntryOptimisticReadCommittedSeltTest;
 import 
org.apache.ignite.internal.processors.cache.CacheGetEntryOptimisticRepeatableReadSeltTest;
 import 
org.apache.ignite.internal.processors.cache.CacheGetEntryOptimisticSerializableSeltTest;
@@ -279,6 +280,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         suite.addTestSuite(CacheStoreUsageMultinodeStaticStartTxTest.class);
         
suite.addTestSuite(CacheStoreUsageMultinodeDynamicStartAtomicTest.class);
         suite.addTestSuite(CacheStoreUsageMultinodeDynamicStartTxTest.class);
+        suite.addTestSuite(CacheConnectionLeakStoreTxTest.class);
 
         suite.addTestSuite(GridCacheStoreManagerDeserializationTest.class);
         
suite.addTestSuite(GridLocalCacheStoreManagerDeserializationTest.class);

Reply via email to