Repository: ignite
Updated Branches:
  refs/heads/master 1b6981000 -> a9f37a2e4


IGNITE-9927: Reverted changes to CacheContinuousQueryOperationFromCallbackTest 
until flaky failures are fixed.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a9f37a2e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a9f37a2e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a9f37a2e

Branch: refs/heads/master
Commit: a9f37a2e4800df5eefc81345292d9b4a4ab94c67
Parents: 1b69810
Author: devozerov <[email protected]>
Authored: Thu Oct 18 12:19:38 2018 +0300
Committer: devozerov <[email protected]>
Committed: Thu Oct 18 12:19:38 2018 +0300

----------------------------------------------------------------------
 ...ontinuousQueryOperationFromCallbackTest.java | 205 ++++---------------
 1 file changed, 39 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a9f37a2e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
index 3cb13bf..0540b43 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
@@ -63,13 +63,10 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static 
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static 
org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
-import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
 /**
  *
@@ -200,60 +197,6 @@ public class CacheContinuousQueryOperationFromCallbackTest 
extends GridCommonAbs
     }
 
     /**
-     * @throws Exception If failed.
-     */
-    public void testMvccTxTwoBackupsFilter() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL_SNAPSHOT, FULL_SYNC);
-
-        doTest(ccfg, false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testMvccTxTwoBackupsFilterPrimary() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL_SNAPSHOT, PRIMARY_SYNC);
-
-        doTest(ccfg, false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testMvccTxReplicatedFilter() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(REPLICATED, 0, TRANSACTIONAL_SNAPSHOT, FULL_SYNC);
-
-        doTest(ccfg, false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testMvccTxTwoBackup() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL_SNAPSHOT, FULL_SYNC);
-
-        doTest(ccfg, true);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testMvccTxReplicated() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(REPLICATED, 2, TRANSACTIONAL_SNAPSHOT, FULL_SYNC);
-
-        doTest(ccfg, true);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testMvccTxReplicatedPrimary() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(REPLICATED, 2, TRANSACTIONAL_SNAPSHOT, PRIMARY_SYNC);
-
-        doTest(ccfg, true);
-    }
-
-    /**
      * @param ccfg Cache configuration.
      * @throws Exception If failed.
      */
@@ -309,51 +252,34 @@ public class 
CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
 
                         QueryTestKey key = new QueryTestKey(rnd.nextInt(KEYS));
 
-                        boolean startTx = 
cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() !=
-                            ATOMIC && rnd.nextBoolean();
+                        boolean startTx = 
cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() ==
+                            TRANSACTIONAL && rnd.nextBoolean();
 
                         Transaction tx = null;
 
-                        boolean committed = false;
-
-                        while (!committed && 
!Thread.currentThread().isInterrupted()) {
-                            try {
-                                if (startTx)
-                                    tx = 
cache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+                        if (startTx)
+                            tx = 
cache.unwrap(Ignite.class).transactions().txStart();
 
-                                if ((cache.get(key) == null) || 
rnd.nextBoolean())
-                                    cache.invoke(key, new 
IncrementTestEntryProcessor());
-                                else {
-                                    QueryTestValue val;
-                                    QueryTestValue newVal;
+                        try {
+                            if ((cache.get(key) == null) || rnd.nextBoolean())
+                                cache.invoke(key, new 
IncrementTestEntryProcessor());
+                            else {
+                                QueryTestValue val;
+                                QueryTestValue newVal;
 
-                                    do {
-                                        val = cache.get(key);
+                                do {
+                                    val = cache.get(key);
 
-                                        newVal = val == null ?
-                                            new QueryTestValue(0) : new 
QueryTestValue(val.val1 + 1);
-                                    }
-                                    while (!cache.replace(key, val, newVal));
+                                    newVal = val == null ?
+                                        new QueryTestValue(0) : new 
QueryTestValue(val.val1 + 1);
                                 }
-
-                                if (tx != null)
-                                    tx.commit();
-
-                                committed = true;
-                            }
-                            catch (Exception e) {
-                                assertTrue(e.getMessage(), e.getMessage() != 
null &&
-                                    (e.getMessage().contains("Transaction has 
been rolled back") ||
-                                        e.getMessage().contains("Cannot 
serialize transaction due to write conflict")));
-
-                                // Wait MVCC updates become visible.
-                                doSleep(50);
-                            }
-                            finally {
-                                if (tx != null)
-                                    tx.close();
+                                while (!cache.replace(key, val, newVal));
                             }
                         }
+                        finally {
+                            if (tx != null)
+                                tx.commit();
+                        }
                     }
                 }
             }, threadCnt, "put-thread");
@@ -376,7 +302,7 @@ public class CacheContinuousQueryOperationFromCallbackTest 
extends GridCommonAbs
                     @Override public boolean apply() {
                         return cbCntr.get() >= expCnt;
                     }
-                }, TimeUnit.SECONDS.toMillis(120));
+                }, TimeUnit.SECONDS.toMillis(60));
 
                 assertTrue("Failed to wait events [exp=" + expCnt + ", act=" + 
cbCntr.get() + "]", res);
 
@@ -393,7 +319,7 @@ public class CacheContinuousQueryOperationFromCallbackTest 
extends GridCommonAbs
                     @Override public boolean apply() {
                         return filterCbCntr.get() >= expInvkCnt;
                     }
-                }, TimeUnit.SECONDS.toMillis(120));
+                }, TimeUnit.SECONDS.toMillis(60));
 
                 assertEquals(expInvkCnt, filterCbCntr.get());
 
@@ -481,45 +407,17 @@ public class 
CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
             if (e.getKey().compareTo(new QueryTestKey(KEYS)) < 0) {
                 IgniteCache<QueryTestKey, QueryTestValue> cache = 
ignite.cache(cacheName);
 
-                boolean committed = false;
-                Transaction tx = null;
-                boolean startTx = 
cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == 
TRANSACTIONAL_SNAPSHOT;
-
-                while (!committed && !Thread.currentThread().isInterrupted()) {
-                    try {
-                        if (startTx)
-                            tx = ignite.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ);
-
-                        if (ThreadLocalRandom.current().nextBoolean()) {
-                            Set<QueryTestKey> keys = new LinkedHashSet<>();
+                if (ThreadLocalRandom.current().nextBoolean()) {
+                    Set<QueryTestKey> keys = new LinkedHashSet<>();
 
-                            for (int key = KEYS; key < KEYS + 
KEYS_FROM_CALLBACK; key++)
-                                keys.add(new QueryTestKey(key));
+                    for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; 
key++)
+                        keys.add(new QueryTestKey(key));
 
-                            cache.invokeAll(keys, new 
IncrementTestEntryProcessor());
-                        }
-                        else {
-                            for (int key = KEYS; key < KEYS + 
KEYS_FROM_CALLBACK; key++)
-                                cache.invoke(new QueryTestKey(key), new 
IncrementTestEntryProcessor());
-                        }
-
-                        if (tx != null)
-                            tx.commit();
-
-                        committed = true;
-                    }
-                    catch (Exception ex) {
-                        assertTrue(ex.getMessage(), ex.getMessage() != null &&
-                            (ex.getMessage().contains("Transaction has been 
rolled back") ||
-                                ex.getMessage().contains("Cannot serialize 
transaction due to write conflict")));
-
-                        // Wait MVCC updates become visible.
-                        doSleep(50);
-                    }
-                    finally {
-                        if (tx != null)
-                            tx.close();
-                    }
+                    cache.invokeAll(keys, new IncrementTestEntryProcessor());
+                }
+                else {
+                    for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; 
key++)
+                        cache.invoke(new QueryTestKey(key), new 
IncrementTestEntryProcessor());
                 }
 
                 filterCbCntr.incrementAndGet();
@@ -579,42 +477,17 @@ public class 
CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
                     cntr.incrementAndGet();
 
                     if (cache != null) {
-                        boolean committed = false;
-                        Transaction tx = null;
-                        boolean startTx = 
cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == 
TRANSACTIONAL_SNAPSHOT;
-
-                        while (!committed && 
!Thread.currentThread().isInterrupted()) {
-                            try {
-                                if (startTx)
-                                    tx = 
cache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
-
-                                if (ThreadLocalRandom.current().nextBoolean()) 
{
-                                    Set<QueryTestKey> keys = new 
LinkedHashSet<>();
-
-                                    for (int key = KEYS; key < KEYS + 
KEYS_FROM_CALLBACK; key++)
-                                        keys.add(new QueryTestKey(key));
-
-                                    cache.invokeAll(keys, new 
IncrementTestEntryProcessor());
-                                }
-                                else {
-                                    for (int key = KEYS; key < KEYS + 
KEYS_FROM_CALLBACK; key++)
-                                        cache.invoke(new QueryTestKey(key), 
new IncrementTestEntryProcessor());
-                                }
+                        if (ThreadLocalRandom.current().nextBoolean()) {
+                            Set<QueryTestKey> keys = new LinkedHashSet<>();
 
-                                if (tx != null)
-                                    tx.commit();
+                            for (int key = KEYS; key < KEYS + 
KEYS_FROM_CALLBACK; key++)
+                                keys.add(new QueryTestKey(key));
 
-                                committed = true;
-                            }
-                            catch (Exception ex) {
-                                assertTrue(ex.getMessage(), ex.getMessage() != 
null &&
-                                    (ex.getMessage().contains("Transaction has 
been rolled back") ||
-                                        ex.getMessage().contains("Cannot 
serialize transaction due to write conflict")));
-                            }
-                            finally {
-                                if (tx != null)
-                                    tx.close();
-                            }
+                            cache.invokeAll(keys, new 
IncrementTestEntryProcessor());
+                        }
+                        else {
+                            for (int key = KEYS; key < KEYS + 
KEYS_FROM_CALLBACK; key++)
+                                cache.invoke(new QueryTestKey(key), new 
IncrementTestEntryProcessor());
                         }
                     }
                 }

Reply via email to