IGNITE-3406 - Interceptor and continuous query get correct old value during 
rebalancing.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/147ab9c0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/147ab9c0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/147ab9c0

Branch: refs/heads/master
Commit: 147ab9c08f6ac7edecf656b23d8b25bfab91becf
Parents: c24caba
Author: dkarachentsev <dkarachent...@gridgain.com>
Authored: Mon Sep 19 13:58:41 2016 +0300
Committer: dkarachentsev <dkarachent...@gridgain.com>
Committed: Mon Sep 19 13:58:41 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryEx.java      |  8 ++
 .../processors/cache/GridCacheMapEntry.java     |  9 +-
 .../GridDistributedTxRemoteAdapter.java         |  6 ++
 .../distributed/dht/GridDhtTxPrepareFuture.java | 36 ++++++-
 .../cache/transactions/IgniteTxEntry.java       | 44 ++++++++-
 .../transactions/IgniteTxLocalAdapter.java      |  8 ++
 .../processors/cache/GridCacheTestEntryEx.java  |  4 +
 .../IgniteCacheInterceptorSelfTestSuite.java    |  5 +
 ...ContinuousQueryFailoverAbstractSelfTest.java | 99 ++++++++++++++++++++
 9 files changed, 213 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 616854f..ef6a244 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -358,6 +358,8 @@ public interface GridCacheEntryEx {
      * @param evt Flag to signal event notification.
      * @param metrics Flag to signal metrics update.
      * @param keepBinary Keep binary flag.
+     * @param oldValPresent {@code True} if oldValue present.
+     * @param oldVal Old value.
      * @param topVer Topology version.
      * @param filter Filter.
      * @param drType DR type.
@@ -383,6 +385,8 @@ public interface GridCacheEntryEx {
         boolean evt,
         boolean metrics,
         boolean keepBinary,
+        boolean oldValPresent,
+        @Nullable CacheObject oldVal,
         AffinityTopologyVersion topVer,
         CacheEntryPredicate[] filter,
         GridDrType drType,
@@ -402,6 +406,8 @@ public interface GridCacheEntryEx {
      * @param evt Flag to signal event notification.
      * @param metrics Flag to signal metrics notification.
      * @param keepBinary Keep binary flag.
+     * @param oldValPresent {@code True} if oldValue present.
+     * @param oldVal Old value.
      * @param topVer Topology version.
      * @param filter Filter.
      * @param drType DR type.
@@ -422,6 +428,8 @@ public interface GridCacheEntryEx {
         boolean evt,
         boolean metrics,
         boolean keepBinary,
+        boolean oldValPresent,
+        @Nullable CacheObject oldVal,
         AffinityTopologyVersion topVer,
         CacheEntryPredicate[] filter,
         GridDrType drType,

http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index c760ac1..a9ac1e7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1141,6 +1141,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         boolean evt,
         boolean metrics,
         boolean keepBinary,
+        boolean oldValPresent,
+        @Nullable CacheObject oldVal,
         AffinityTopologyVersion topVer,
         CacheEntryPredicate[] filter,
         GridDrType drType,
@@ -1198,7 +1200,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             Map<UUID, CacheContinuousQueryListener> lsnrCol =
                 notifyContinuousQueries(tx) ? 
cctx.continuousQueries().updateListeners(internal, false) : null;
 
-            old = (retval || intercept || lsnrCol != null) ?
+            old = oldValPresent ? oldVal :
+                (retval || intercept || lsnrCol != null) ?
                 rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : 
this.val;
 
             if (intercept) {
@@ -1333,6 +1336,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         boolean evt,
         boolean metrics,
         boolean keepBinary,
+        boolean oldValPresent,
+        @Nullable CacheObject oldVal,
         AffinityTopologyVersion topVer,
         CacheEntryPredicate[] filter,
         GridDrType drType,
@@ -1403,7 +1408,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             Map<UUID, CacheContinuousQueryListener> lsnrCol =
                 notifyContinuousQueries(tx) ? 
cctx.continuousQueries().updateListeners(internal, false) : null;
 
-            old = (retval || intercept || lsnrCol != null) ?
+            old = oldValPresent ? oldVal : (retval || intercept || lsnrCol != 
null) ?
                 rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : 
val;
 
             if (intercept) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index c56d1f7..9d9862a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -542,6 +542,8 @@ public class GridDistributedTxRemoteAdapter extends 
IgniteTxAdapter
                                                 true,
                                                 true,
                                                 txEntry.keepBinary(),
+                                                txEntry.hasOldValue(),
+                                                txEntry.oldValue(),
                                                 topVer,
                                                 null,
                                                 replicate ? DR_BACKUP : 
DR_NONE,
@@ -561,6 +563,8 @@ public class GridDistributedTxRemoteAdapter extends 
IgniteTxAdapter
                                                 true,
                                                 true,
                                                 txEntry.keepBinary(),
+                                                txEntry.hasOldValue(),
+                                                txEntry.oldValue(),
                                                 topVer,
                                                 null,
                                                 replicate ? DR_BACKUP : 
DR_NONE,
@@ -592,6 +596,8 @@ public class GridDistributedTxRemoteAdapter extends 
IgniteTxAdapter
                                             true,
                                             true,
                                             txEntry.keepBinary(),
+                                            txEntry.hasOldValue(),
+                                            txEntry.oldValue(),
                                             topVer,
                                             null,
                                             replicate ? DR_BACKUP : DR_NONE,

http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 1bdd9b8..ec73bff 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -360,7 +360,12 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
 
                 boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && 
!F.isAlwaysTrue(txEntry.filters());
 
-                if (hasFilters || retVal || txEntry.op() == DELETE || 
txEntry.op() == TRANSFORM) {
+                CacheObject val;
+                CacheObject oldVal = null;
+
+                boolean readOld = hasFilters || retVal || txEntry.op() == 
DELETE || txEntry.op() == TRANSFORM;
+
+                if (readOld) {
                     cached.unswap(retVal);
 
                     boolean readThrough = !txEntry.skipStore() &&
@@ -375,7 +380,7 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
 
                     final boolean keepBinary = txEntry.keepBinary();
 
-                    CacheObject val = cached.innerGet(
+                    val = oldVal = cached.innerGet(
                         null,
                         tx,
                         /*swap*/true,
@@ -470,6 +475,33 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
                     else
                         ret.success(txEntry.op() != DELETE || 
cached.hasValue());
                 }
+
+                // Send old value in case if rebalancing is not finished.
+                final boolean sndOldVal = !cacheCtx.isLocal() && 
!cacheCtx.topology().rebalanceFinished(tx.topologyVersion());
+
+                if (sndOldVal) {
+                    if (oldVal == null && !readOld) {
+                        oldVal = cached.innerGet(
+                            null,
+                            tx,
+                            /*swap*/true,
+                            /*readThrough*/false,
+                            /*metrics*/false,
+                            /*event*/false,
+                            /*tmp*/false,
+                            /*subjectId*/tx.subjectId(),
+                            /*transformClo*/null,
+                            /*taskName*/null,
+                            /*expiryPlc*/null,
+                            /*keepBinary*/true);
+                    }
+
+                    if (oldVal != null) {
+                        oldVal.prepareMarshal(cacheCtx.cacheObjectContext());
+
+                        txEntry.oldValue(oldVal, true);
+                    }
+                }
             }
             catch (IgniteCheckedException e) {
                 U.error(log, "Failed to get result value for cache entry: " + 
cached, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 87b2525..194208e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -115,6 +115,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, 
Message {
     @GridDirectTransient
     private TxEntryValueHolder prevVal = new TxEntryValueHolder();
 
+    /** Old value before update. */
+    @GridToStringInclude
+    private TxEntryValueHolder oldVal = new TxEntryValueHolder();
+
     /** Transform. */
     @GridToStringInclude
     @GridDirectTransient
@@ -497,7 +501,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, 
Message {
     }
 
     /**
-     * @param oldValOnPrimary {@code True} If old value for 'invoke' operation 
was non null on primary node.
+     * @param oldValOnPrimary {@code True} If old value for was non null on 
primary node.
      */
     public void oldValueOnPrimary(boolean oldValOnPrimary) {
         setFlag(oldValOnPrimary, OLD_VAL_ON_PRIMARY);
@@ -583,6 +587,30 @@ public class IgniteTxEntry implements GridPeerDeployAware, 
Message {
     }
 
     /**
+     * @return Old value.
+     */
+    @Nullable public CacheObject oldValue() {
+        return oldVal != null ? oldVal.value() : null;
+    }
+
+    /**
+     * @param oldVal Old value.
+     */
+    public void oldValue(CacheObject oldVal, boolean hasOldVal) {
+        if (this.oldVal == null)
+            this.oldVal = new TxEntryValueHolder();
+
+        this.oldVal.value(op(), oldVal, hasOldVal, hasOldVal);
+    }
+
+    /**
+     * @return {@code True} if old value present.
+     */
+    public boolean hasOldValue() {
+        return oldVal != null && oldVal.hasValue();
+    }
+
+    /**
      * @return {@code True} if has value explicitly set.
      */
     public boolean hasValue() {
@@ -1069,6 +1097,11 @@ public class IgniteTxEntry implements 
GridPeerDeployAware, Message {
 
                 writer.incrementState();
 
+            case 13:
+                if (!writer.writeMessage("oldVal", oldVal))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -1186,6 +1219,13 @@ public class IgniteTxEntry implements 
GridPeerDeployAware, Message {
 
                 reader.incrementState();
 
+            case 13:
+                oldVal = reader.readMessage("oldVal");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return reader.afterMessageRead(IgniteTxEntry.class);
@@ -1198,7 +1238,7 @@ public class IgniteTxEntry implements 
GridPeerDeployAware, Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 13;
+        return 14;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index ee992cc..637f322 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -809,6 +809,8 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
                                             evt,
                                             metrics,
                                             txEntry.keepBinary(),
+                                            txEntry.hasOldValue(),
+                                            txEntry.oldValue(),
                                             topVer,
                                             null,
                                             cached.detached() ? DR_NONE : 
drType,
@@ -834,6 +836,8 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
                                                 false,
                                                 metrics,
                                                 txEntry.keepBinary(),
+                                                txEntry.hasOldValue(),
+                                                txEntry.oldValue(),
                                                 topVer,
                                                 CU.empty0(),
                                                 DR_NONE,
@@ -854,6 +858,8 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
                                             evt,
                                             metrics,
                                             txEntry.keepBinary(),
+                                            txEntry.hasOldValue(),
+                                            txEntry.oldValue(),
                                             topVer,
                                             null,
                                             cached.detached()  ? DR_NONE : 
drType,
@@ -875,6 +881,8 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
                                                 false,
                                                 metrics,
                                                 txEntry.keepBinary(),
+                                                txEntry.hasOldValue(),
+                                                txEntry.oldValue(),
                                                 topVer,
                                                 CU.empty0(),
                                                 DR_NONE,

http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 400fb14..bf543cb 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -477,6 +477,8 @@ public class GridCacheTestEntryEx extends 
GridMetadataAwareAdapter implements Gr
         boolean evt,
         boolean metrics,
         boolean keepBinary,
+        boolean hasOldVal,
+        @Nullable CacheObject oldVal,
         AffinityTopologyVersion topVer,
         CacheEntryPredicate[] filter,
         GridDrType drType,
@@ -556,6 +558,8 @@ public class GridCacheTestEntryEx extends 
GridMetadataAwareAdapter implements Gr
         boolean evt,
         boolean metrics,
         boolean keepBinary,
+        boolean oldValPresent,
+        @Nullable CacheObject oldVal,
         AffinityTopologyVersion topVer,
         CacheEntryPredicate[] filter,
         GridDrType drType,

http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
index d19ecd7..17d88ae 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
@@ -58,6 +58,11 @@ public class IgniteCacheInterceptorSelfTestSuite extends 
TestSuite {
         
suite.addTestSuite(CacheInterceptorPartitionCounterRandomOperationsTest.class);
         
suite.addTestSuite(CacheInterceptorPartitionCounterLocalSanityTest.class);
 
+        suite.addTestSuite(GridCacheInterceptorAtomicRebalanceTest.class);
+        
suite.addTestSuite(GridCacheInterceptorTransactionalRebalanceTest.class);
+        
suite.addTestSuite(GridCacheInterceptorAtomicOffheapRebalanceTest.class);
+        
suite.addTestSuite(GridCacheInterceptorTransactionalOffheapRebalanceTest.class);
+
         return suite;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 083367c..1376be1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -57,6 +57,8 @@ import 
org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
@@ -65,6 +67,7 @@ import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
@@ -312,6 +315,102 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
     }
 
     /**
+     * Test that during rebalancing correct old value passed to continuous 
query.
+     *
+     * @throws Exception If fail.
+     */
+    public void testRebalance() throws Exception {
+        for (int iter = 0; iter < 5; iter++) {
+            log.info("Iteration: " + iter);
+
+            final IgniteEx ignite = startGrid(1);
+
+            final CacheConfiguration<Integer, Integer> ccfg = new 
CacheConfiguration<>("testCache");
+
+            ccfg.setAtomicityMode(atomicityMode());
+            
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+            ccfg.setCacheMode(cacheMode());
+            ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+            ccfg.setBackups(2);
+
+            final IgniteCache<Integer, Integer> cache = 
ignite.getOrCreateCache(ccfg);
+
+            final int KEYS = 10_000;
+
+            for (int i = 0; i < KEYS; i++)
+                cache.put(i, i);
+
+            final ContinuousQuery<Integer, Integer> qry = new 
ContinuousQuery<>();
+
+            final AtomicBoolean err = new AtomicBoolean();
+
+            final AtomicInteger cntr = new AtomicInteger();
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<Integer, 
Integer>() {
+                @Override public void onUpdated(
+                    final Iterable<CacheEntryEvent<? extends Integer, ? 
extends Integer>> cacheEntryEvts) {
+                    try {
+                        for (final CacheEntryEvent<? extends Integer, ? 
extends Integer> evt : cacheEntryEvts) {
+                            final Integer oldVal = evt.getOldValue();
+
+                            final Integer val = evt.getValue();
+
+                            assertNotNull("No old value: " + evt, oldVal);
+                            assertEquals("Unexpected old value: " + evt, 
(Integer)(oldVal + 1), val);
+
+                            cntr.incrementAndGet();
+                        }
+                    }
+                    catch (Throwable e) {
+                        err.set(true);
+
+                        error("Unexpected error: " + e, e);
+                    }
+                }
+            });
+
+            final QueryCursor<Cache.Entry<Integer, Integer>> cur = 
cache.query(qry);
+
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            final IgniteInternalFuture<Object> updFut = 
GridTestUtils.runAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    latch.await();
+
+                    for (int i = 0; i < KEYS && !err.get(); i++)
+                        cache.put(i, i + 1);
+
+                    return null;
+                }
+            });
+
+            final IgniteInternalFuture<Object> rebFut = 
GridTestUtils.runAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    latch.await();
+
+                    for (int i = 2; i <= 5 && !err.get(); i++)
+                        startGrid(i);
+
+                    return null;
+                }
+            });
+
+            latch.countDown();
+
+            updFut.get();
+            rebFut.get();
+
+            assertFalse("Unexpected error during test", err.get());
+
+            assertTrue(cntr.get() > 0);
+
+            cur.close();
+
+            stopAllGrids();
+        }
+    }
+
+    /**
      * @param ignite Ignite.
      * @param topVer Topology version.
      * @throws Exception If failed.

Reply via email to