Repository: ignite
Updated Branches:
  refs/heads/master 52b46c35f -> caad1e912


ignite-6858 Fail query if thread has is cache lock and exchange is in progress


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/caad1e91
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/caad1e91
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/caad1e91

Branch: refs/heads/master
Commit: caad1e912192d3da0d74395db6cc3625fe2eb804
Parents: 52b46c3
Author: Aleksei Scherbakov <alexey.scherbak...@gmail.com>
Authored: Thu Nov 16 12:45:11 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Nov 16 12:45:11 2017 +0300

----------------------------------------------------------------------
 .../cache/distributed/near/GridNearTxLocal.java |  7 +-
 .../processors/query/h2/IgniteH2Indexing.java   | 17 +++++
 .../h2/twostep/GridReduceQueryExecutor.java     | 11 ++-
 .../IgniteCacheQueryNodeRestartSelfTest2.java   | 75 +++++++++++++++++---
 .../IgniteCacheQueryNodeRestartTxSelfTest.java  | 36 ++++++++++
 .../IgniteCacheQuerySelfTestSuite2.java         |  2 +
 6 files changed, 133 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/caad1e91/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 085f0b7..a3fddaf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -4122,14 +4122,13 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
     /** {@inheritDoc} */
     @Override public void onTimeout() {
         if (state(MARKED_ROLLBACK, true) || (state() == MARKED_ROLLBACK)) {
-            if (log.isDebugEnabled())
-                log.debug("Will rollback tx on timeout: " + this);
-
             cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                 @Override public void run() {
-                    // Note: if rollback asynchonously on timeout should not 
clear thread map
+                    // Note: if rollback asynchronously on timeout should not 
clear thread map
                     // since thread started tx still should be able to see 
this tx.
                     rollbackNearTxLocalAsync(true);
+
+                    U.warn(log, "Transaction was rolled back because the 
timeout is reached: " + GridNearTxLocal.this);
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/caad1e91/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 450ee20..333a958 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -71,6 +71,7 @@ import 
org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import 
org.apache.ignite.internal.processors.cache.query.CacheQueryPartitionInfo;
@@ -2477,6 +2478,22 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     }
 
     /**
+     * @param readyVer Ready topology version.
+     *
+     * @return {@code true} If pending distributed exchange exists because 
server topology is changed.
+     */
+    public boolean serverTopologyChanged(AffinityTopologyVersion readyVer) {
+        GridDhtPartitionsExchangeFuture fut = 
ctx.cache().context().exchange().lastTopologyFuture();
+
+        if (fut.isDone())
+            return false;
+
+        AffinityTopologyVersion initVer = fut.initialVersion();
+
+        return initVer.compareTo(readyVer) > 0 && 
!CU.clientNode(fut.firstEvent().node());
+    }
+
+    /**
      * @param topVer Topology version.
      * @throws IgniteCheckedException If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/caad1e91/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index f85cd94..8e994aa 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -90,6 +90,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.transactions.TransactionException;
 import org.h2.command.ddl.CreateTableData;
 import org.h2.engine.Session;
 import org.h2.index.Index;
@@ -560,9 +561,15 @@ public class GridReduceQueryExecutor {
 
             AffinityTopologyVersion topVer = h2.readyTopologyVersion();
 
+            // Check if topology is changed while retrying on locked topology.
+            if (h2.serverTopologyChanged(topVer) && 
ctx.cache().context().lockedTopologyVersion(null) != null) {
+                throw new CacheException(new TransactionException("Server 
topology is changed during query " +
+                    "execution inside a transaction. It's recommended to 
rollback and retry transaction."));
+            }
+
             List<Integer> cacheIds = qry.cacheIds();
 
-            Collection<ClusterNode> nodes = null;
+            Collection<ClusterNode> nodes;
 
             // Explicit partition mapping for unstable topology.
             Map<ClusterNode, IntArray> partsMap = null;
@@ -1737,4 +1744,4 @@ public class GridReduceQueryExecutor {
             return qryMap;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/caad1e91/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
index 627b3eb..bda503e 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
@@ -32,20 +32,25 @@ import 
org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.GridRandom;
 import org.apache.ignite.internal.util.typedef.CAX;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.TransactionException;
+import org.apache.ignite.transactions.TransactionTimeoutException;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -224,7 +229,7 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends 
GridCommonAbstractTest
 
         IgniteInternalFuture<?> fut1 = multithreadedAsync(new CAX() {
             @Override public void applyx() throws IgniteCheckedException {
-                GridRandom rnd = new GridRandom();
+                final GridRandom rnd = new GridRandom();
 
                 while (!qrysDone.get()) {
                     int g;
@@ -235,28 +240,43 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends 
GridCommonAbstractTest
                     while (!locks.compareAndSet(g, 0, 1));
 
                     try {
+                        final IgniteEx grid = grid(g);
+
                         if (rnd.nextBoolean()) { // Partitioned query.
-                            IgniteCache<?,?> cache = grid(g).cache("pu");
+                            final IgniteCache<?,?> cache = grid.cache("pu");
 
-                            SqlFieldsQuery qry = new 
SqlFieldsQuery(PARTITIONED_QRY);
+                            final SqlFieldsQuery qry = new 
SqlFieldsQuery(PARTITIONED_QRY);
 
                             boolean smallPageSize = rnd.nextBoolean();
 
                             if (smallPageSize)
                                 qry.setPageSize(3);
 
+                            final IgniteCache<Integer, Company> co = 
grid.cache("co");
+
                             try {
-                                assertEquals(pRes, cache.query(qry).getAll());
+                                runQuery(grid, new Runnable() {
+                                    @Override public void run() {
+                                        if (rnd.nextBoolean())
+                                            co.get(rnd.nextInt(COMPANY_CNT)); 
// Get lock run test with open transaction.
+
+                                        assertEquals(pRes, 
cache.query(qry).getAll());
+                                    }
+                                });
                             } catch (CacheException e) {
                                 // Interruptions are expected here.
-                                if (e.getCause() instanceof 
IgniteInterruptedCheckedException)
+                                if (e.getCause() instanceof 
IgniteInterruptedCheckedException ||
+                                    e.getCause() instanceof 
InterruptedException ||
+                                    e.getCause() instanceof 
ClusterTopologyException ||
+                                    e.getCause() instanceof 
TransactionTimeoutException ||
+                                    e.getCause() instanceof 
TransactionException)
                                     continue;
 
                                 if (e.getCause() instanceof 
QueryCancelledException)
                                     fail("Retry is expected");
 
                                 if (!smallPageSize)
-                                    e.printStackTrace();
+                                    U.error(grid.log(), "On large page size 
must retry.", e);
 
                                 assertTrue("On large page size must retry.", 
smallPageSize);
 
@@ -286,13 +306,13 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends 
GridCommonAbstractTest
                                     continue;
 
                                 if (!failedOnRemoteFetch) {
-                                    e.printStackTrace();
+                                    U.error(grid.log(), "Must fail inside of 
GridResultPage.fetchNextPage or subclass.", e);
 
                                     fail("Must fail inside of 
GridResultPage.fetchNextPage or subclass.");
                                 }
                             }
                         } else { // Replicated query.
-                            IgniteCache<?, ?> cache = grid(g).cache("co");
+                            IgniteCache<?, ?> cache = grid.cache("co");
 
                             assertEquals(rRes, cache.query(new 
SqlFieldsQuery(REPLICATED_QRY)).getAll());
                         }
@@ -358,7 +378,14 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends 
GridCommonAbstractTest
 
         restartsDone.set(true);
 
-        fut2.get();
+        try {
+            fut2.get(20_000);
+        }
+        catch (IgniteFutureTimeoutCheckedException e) {
+            U.dumpThreads(log);
+
+            fail("Stopping restarts timeout.");
+        }
 
         info("Restarts stopped.");
 
@@ -380,12 +407,26 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends 
GridCommonAbstractTest
     }
 
     /**
+     * Run query closure.
+     *
+     * @param grid Grid.
+     * @param qryRunnable Query runnable.
+     */
+    protected void runQuery(IgniteEx grid, Runnable qryRunnable) {
+        qryRunnable.run();
+    }
+
+    /**
      *
      */
     private static class Person implements Serializable {
+        /** */
         @QuerySqlField(index = true)
         int id;
 
+        /**
+         * @param id Person ID.
+         */
         Person(int id) {
             this.id = id;
         }
@@ -395,12 +436,18 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends 
GridCommonAbstractTest
      *
      */
     private static class Purchase implements Serializable {
+        /** */
         @QuerySqlField(index = true)
         int personId;
 
+        /** */
         @QuerySqlField(index = true)
         int productId;
 
+        /**
+         * @param personId Person ID.
+         * @param productId Product ID.
+         */
         Purchase(int personId, int productId) {
             this.personId = personId;
             this.productId = productId;
@@ -411,9 +458,13 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends 
GridCommonAbstractTest
      *
      */
     private static class Company implements Serializable {
+        /** */
         @QuerySqlField(index = true)
         int id;
 
+        /**
+         * @param id ID.
+         */
         Company(int id) {
             this.id = id;
         }
@@ -423,12 +474,18 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends 
GridCommonAbstractTest
      *
      */
     private static class Product implements Serializable {
+        /** */
         @QuerySqlField(index = true)
         int id;
 
+        /** */
         @QuerySqlField(index = true)
         int companyId;
 
+        /**
+         * @param id ID.
+         * @param companyId Company ID.
+         */
         Product(int id, int companyId) {
             this.id = id;
             this.companyId = companyId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/caad1e91/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartTxSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartTxSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartTxSelfTest.java
new file mode 100644
index 0000000..ae06c42
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartTxSelfTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.transactions.Transaction;
+
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Test for distributed queries with node restarts inside transactions.
+ */
+public class IgniteCacheQueryNodeRestartTxSelfTest extends 
IgniteCacheQueryNodeRestartSelfTest2 {
+    /** {@inheritDoc} */
+    @Override protected void runQuery(IgniteEx grid, Runnable qryRunnable) {
+        try(Transaction tx = grid.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+            qryRunnable.run();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/caad1e91/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
index 91e4478..abe06ec 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
@@ -31,6 +31,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQ
 import 
org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartDistributedJoinSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest2;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartTxSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest;
 import 
org.apache.ignite.internal.processors.cache.index.DynamicColumnsConcurrentAtomicPartitionedSelfTest;
 import 
org.apache.ignite.internal.processors.cache.index.DynamicColumnsConcurrentAtomicReplicatedSelfTest;
@@ -85,6 +86,7 @@ public class IgniteCacheQuerySelfTestSuite2 extends TestSuite 
{
         suite.addTestSuite(IgniteCacheQueryNodeFailTest.class);
         suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class);
         suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class);
+        suite.addTestSuite(IgniteCacheQueryNodeRestartTxSelfTest.class);
         suite.addTestSuite(IgniteCacheSqlQueryMultiThreadedSelfTest.class);
         
suite.addTestSuite(IgniteCachePartitionedQueryMultiThreadedSelfTest.class);
         suite.addTestSuite(CacheScanPartitionQueryFallbackSelfTest.class);

Reply via email to