Repository: ignite
Updated Branches:
  refs/heads/ignite-6149 9d90972df -> 7a9943265


ignite-6149


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7a994326
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7a994326
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7a994326

Branch: refs/heads/ignite-6149
Commit: 7a99432655307acf7c5e190e20952322f68e0e4f
Parents: 9d90972
Author: sboikov <[email protected]>
Authored: Wed Sep 13 12:44:29 2017 +0300
Committer: sboikov <[email protected]>
Committed: Wed Sep 13 13:40:25 2017 +0300

----------------------------------------------------------------------
 .../cache/IgniteCacheOffheapManagerImpl.java    |  21 ++--
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 109 +++++++++++++++++++
 2 files changed, 121 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7a994326/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 3e699ac..a507985 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1375,6 +1375,8 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                 while (cur.next()) {
                     CacheDataRow oldVal = cur.get();
 
+                    assert oldVal.link() != 0 : oldVal;
+
                     if (activeTxs != null && oldVal.mvccCoordinatorVersion() 
== mvccVer.coordinatorVersion() &&
                         activeTxs.contains(oldVal.mvccCounter())) {
                         if (waitTxs == null)
@@ -1384,17 +1386,22 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                         waitTxs.add(oldVal.mvccCounter());
                     }
-                    else if (!first) {
+                    else {
+                        // Should not delete oldest version which is less then 
cleanup version .
                         int cmp = compare(oldVal, 
mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
 
                         if (cmp <= 0) {
-                            boolean rmvd = dataTree.removex(oldVal);
+                            if (first)
+                                first = false;
+                            else {
+                                boolean rmvd = dataTree.removex(oldVal);
+
+                                assert rmvd;
 
-                            assert rmvd;
+                                rowStore.removeRow(oldVal.link());
+                            }
                         }
                     }
-
-                    first = false;
                 }
 
                 return waitTxs;
@@ -1629,10 +1636,6 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         @Override public CacheDataRow mvccFind(GridCacheContext cctx,
             KeyCacheObject key,
             MvccCoordinatorVersion ver) throws IgniteCheckedException {
-//            log.info("mvccFind [k=" + key.value(cctx.cacheObjectContext(), 
false) +
-//                ", topVer=" + ver.topologyVersion() +
-//                ", cntr=" + ver.counter() + ']');
-
             key.valueBytes(cctx.cacheObjectContext());
 
             int cacheId = grp.sharedGroup() ? cctx.cacheId() : 
CU.UNDEFINED_CACHE_ID;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a994326/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index a5fd61e..002da40 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -43,6 +43,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
 import org.apache.ignite.internal.util.lang.GridInClosure3;
@@ -472,6 +473,8 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
         final Integer key1 = primaryKey(ignite(0).cache(cache.getName()));
         final Integer key2 = primaryKey(ignite(1).cache(cache.getName()));
 
+        info("Test keys [key1=" + key1 + ", key2=" + key2 + ']');
+
         try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
             cache.put(key1, 1);
             cache.put(key2, 1);
@@ -582,6 +585,112 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testCleanupWaitsForGet() throws Exception {
+        boolean vals[] = {true, false};
+
+        for (boolean otherPuts : vals) {
+            for (boolean putOnStart : vals) {
+                cleanupWaitsForGet(otherPuts, putOnStart);
+
+                afterTest();
+            }
+        }
+    }
+
+    /**
+     * @param otherPuts {@code True} to update unrelated keys to increment 
mvcc counter.
+     * @param putOnStart {@code True} to put data in cache before getAll.
+     * @throws Exception If failed.
+     */
+    private void cleanupWaitsForGet(boolean otherPuts, final boolean 
putOnStart) throws Exception {
+        info("cleanupWaitsForGet [otherPuts=" + otherPuts + ", putOnStart=" + 
putOnStart + "]");
+
+        testSpi = true;
+
+        client = false;
+
+        final Ignite srv = startGrid(0);
+
+        client = true;
+
+        final Ignite client = startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        final IgniteCache<Object, Object> srvCache =
+            srv.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 16));
+
+        final Integer key1 = 1;
+        final Integer key2 = 2;
+
+        if (putOnStart) {
+            try (Transaction tx = srv.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                srvCache.put(key1, 0);
+                srvCache.put(key2, 0);
+
+                tx.commit();
+            }
+        }
+
+        if (otherPuts) {
+            for (int i = 0; i < 3; i++) {
+                try (Transaction tx = srv.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                    srvCache.put(1_000_000 + i, 99);
+
+                    tx.commit();
+                }
+            }
+        }
+
+        TestRecordingCommunicationSpi clientSpi = 
TestRecordingCommunicationSpi.spi(client);
+
+        clientSpi.blockMessages(GridNearGetRequest.class, 
getTestIgniteInstanceName(0));
+
+        IgniteInternalFuture<?> getFut = GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                IgniteCache<Integer, Integer> cache = 
client.cache(srvCache.getName());
+
+                Map<Integer, Integer> vals = cache.getAll(F.asSet(key1, key2));
+
+                if (putOnStart) {
+                    assertEquals(2, vals.size());
+                    assertEquals(0, (Object)vals.get(key1));
+                    assertEquals(0, (Object)vals.get(key2));
+                }
+                else
+                    assertEquals(0, vals.size());
+
+                return null;
+            }
+        }, "get-thread");
+
+        clientSpi.waitForBlocked();
+
+        for (int i = 0; i < 5; i++) {
+            try (Transaction tx = srv.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                srvCache.put(key1, i + 1);
+                srvCache.put(key2, i + 1);
+
+                tx.commit();
+            }
+        }
+
+        clientSpi.stopBlock(true);
+
+        getFut.get();
+
+        IgniteCache<Integer, Integer> cache = client.cache(srvCache.getName());
+
+        Map<Integer, Integer> vals = cache.getAll(F.asSet(key1, key2));
+
+        assertEquals(2, vals.size());
+        assertEquals(5, (Object)vals.get(key1));
+        assertEquals(5, (Object)vals.get(key2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testPutAllGetAll_SingleNode() throws Exception {
         putAllGetAll(1, 0, 0, 64);
     }

Reply via email to