This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ed8576  IGNITE-12665: SQL: Fix potential race on MapResult close.
8ed8576 is described below

commit 8ed8576544f5c4dbe74fda02de420dfa49aba355
Author: Andrey V. Mashenkov <[email protected]>
AuthorDate: Thu Feb 13 12:35:22 2020 +0300

    IGNITE-12665: SQL: Fix potential race on MapResult close.
---
 .../query/h2/twostep/GridMapQueryExecutor.java     | 109 +++++++++++----------
 .../query/h2/twostep/MapQueryResult.java           |  31 +++---
 .../query/h2/twostep/MapQueryResults.java          |  45 ++++++---
 3 files changed, 98 insertions(+), 87 deletions(-)

diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 222ce26..8b2c157 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -74,6 +74,7 @@ import org.h2.api.ErrorCode;
 import org.h2.jdbc.JdbcResultSet;
 import org.h2.jdbc.JdbcSQLException;
 import org.h2.value.Value;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
@@ -212,29 +213,27 @@ public class GridMapQueryExecutor {
             final int segment = i;
 
             ctx.closure().callLocal(
-                new Callable<Void>() {
-                    @Override public Void call() {
-                        onQueryRequest0(node,
-                            req.requestId(),
-                            segment,
-                            req.schemaName(),
-                            req.queries(),
-                            cacheIds,
-                            req.topologyVersion(),
-                            partsMap,
-                            parts,
-                            req.pageSize(),
-                            distributedJoins,
-                            enforceJoinOrder,
-                            false,
-                            req.timeout(),
-                            params,
-                            lazy,
-                            req.mvccSnapshot(),
-                            dataPageScanEnabled);
-
-                        return null;
-                    }
+                (Callable<Void>)() -> {
+                    onQueryRequest0(node,
+                        req.requestId(),
+                        segment,
+                        req.schemaName(),
+                        req.queries(),
+                        cacheIds,
+                        req.topologyVersion(),
+                        partsMap,
+                        parts,
+                        req.pageSize(),
+                        distributedJoins,
+                        enforceJoinOrder,
+                        false,
+                        req.timeout(),
+                        params,
+                        lazy,
+                        req.mvccSnapshot(),
+                        dataPageScanEnabled);
+
+                    return null;
                 },
                 QUERY_POOL);
         }
@@ -389,14 +388,10 @@ public class GridMapQueryExecutor {
 
                 qryResults.addResult(qryIdx, res);
 
-                ResultSet rs = null;
-
                 try {
                     res.lock();
 
-                    MapH2QueryInfo qryInfo = null;
-
-                    // If we are not the target node for this replicated 
query, just ignore it.
+                    // Ensure we are on the target node for this replicated 
query.
                     if (qry.node() == null || (segmentId == 0 && 
qry.node().equals(ctx.localNodeId()))) {
                         String sql = qry.query();
                         Collection<Object> params0 = 
F.asList(qry.parameters(params));
@@ -412,9 +407,9 @@ public class GridMapQueryExecutor {
 
                         H2Utils.bindParameters(stmt, params0);
 
-                        qryInfo = new MapH2QueryInfo(stmt, qry.query(), node, 
reqId, segmentId);
+                        MapH2QueryInfo qryInfo = new MapH2QueryInfo(stmt, 
qry.query(), node, reqId, segmentId);
 
-                        rs = h2.executeSqlQueryWithTimer(
+                        ResultSet rs = h2.executeSqlQueryWithTimer(
                             stmt,
                             connWrp.connection(),
                             sql,
@@ -441,24 +436,33 @@ public class GridMapQueryExecutor {
                         }
 
                         assert rs instanceof JdbcResultSet : rs.getClass();
-                    }
 
-                    res.openResult(rs);
+                        if (qryResults.cancelled()) {
+                            rs.close();
 
-                    if (qryResults.cancelled())
-                        throw new QueryCancelledException();
+                            throw new QueryCancelledException();
+                        }
 
-                    final GridQueryNextPageResponse msg = prepareNextPage(
-                        nodeRess,
-                        node,
-                        qryResults,
-                        qryIdx,
-                        segmentId,
-                        pageSize,
-                        dataPageScanEnabled
-                    );
+                        res.openResult(rs);
+
+                        final GridQueryNextPageResponse msg = prepareNextPage(
+                            nodeRess,
+                            node,
+                            qryResults,
+                            qryIdx,
+                            segmentId,
+                            pageSize,
+                            dataPageScanEnabled
+                        );
+
+                        if(msg != null)
+                            sendNextPage(node, msg);
+                    }
+                    else {
+                        assert !qry.isPartitioned();
 
-                    sendNextPage(node, msg);
+                        qryResults.closeResult(qryIdx);
+                    }
 
                     qryIdx++;
                 }
@@ -779,7 +783,8 @@ public class GridMapQueryExecutor {
                         req.pageSize(),
                         dataPageScanEnabled);
 
-                    sendNextPage(node, msg);
+                    if(msg != null)
+                        sendNextPage(node, msg);
                 }
                 finally {
                     qryCtxRegistry.clearThreadLocal();
@@ -874,14 +879,14 @@ public class GridMapQueryExecutor {
      * @param node Node.
      * @param msg Message to send.
      */
-    private void sendNextPage(ClusterNode node, GridQueryNextPageResponse msg) 
{
+    private void sendNextPage(@NotNull ClusterNode node, @NotNull 
GridQueryNextPageResponse msg) {
+        assert msg != null;
+
         try {
-            if (msg != null) {
-                if (node.isLocal())
-                    h2.reduceQueryExecutor().onNextPage(node, msg);
-                else
-                    ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, 
QUERY_POOL);
-            }
+            if (node.isLocal())
+                h2.reduceQueryExecutor().onNextPage(node, msg);
+            else
+                ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, 
QUERY_POOL);
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to send message.", e);
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index c14cf7f..cd6a844 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -43,6 +43,7 @@ import org.h2.jdbc.JdbcResultSet;
 import org.h2.result.LazyResult;
 import org.h2.result.ResultInterface;
 import org.h2.value.Value;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
@@ -130,7 +131,7 @@ class MapQueryResult {
     }
 
     /** */
-    void openResult(ResultSet rs) {
+    void openResult(@NotNull ResultSet rs) {
         res = new Result(rs);
     }
 
@@ -337,28 +338,18 @@ class MapQueryResult {
          *
          * @param rs H2 result set.
          */
-        Result(ResultSet rs) {
-            if (rs != null) {
-                this.rs = rs;
+        Result(@NotNull ResultSet rs) {
+            this.rs = rs;
 
-                try {
-                    res = (ResultInterface)RESULT_FIELD.get(rs);
-                }
-                catch (IllegalAccessException e) {
-                    throw new IllegalStateException(e); // Must not happen.
-                }
-
-                rowCnt = (res instanceof LazyResult) ? -1 : res.getRowCount();
-                cols = res.getVisibleColumnCount();
+            try {
+                res = (ResultInterface)RESULT_FIELD.get(rs);
             }
-            else {
-                this.rs = null;
-                this.res = null;
-                this.cols = -1;
-                this.rowCnt = -1;
-
-                closed = true;
+            catch (IllegalAccessException e) {
+                throw new IllegalStateException(e); // Must not happen.
             }
+
+            rowCnt = (res instanceof LazyResult) ? -1 : res.getRowCount();
+            cols = res.getVisibleColumnCount();
         }
 
         /** */
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
index f42ea61..6ef3d1b 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
@@ -52,6 +52,9 @@ class MapQueryResults {
     /** Query context. */
     private final QueryContext qctx;
 
+    /** Active queries. */
+    private int active;
+
     /**
      * Constructor.
      *
@@ -70,6 +73,7 @@ class MapQueryResults {
         this.lazy = lazy;
         this.qctx = qctx;
 
+        active = qrys;
         results = new AtomicReferenceArray<>(qrys);
         cancels = new GridQueryCancel[qrys];
 
@@ -108,15 +112,8 @@ class MapQueryResults {
     /**
      * @return {@code true} If all results are closed.
      */
-    boolean isAllClosed() {
-        for (int i = 0; i < results.length(); i++) {
-            MapQueryResult res = results.get(i);
-
-            if (res == null || !res.closed())
-                return false;
-        }
-
-        return true;
+    synchronized boolean isAllClosed() {
+        return active == 0;
     }
 
     /**
@@ -152,7 +149,9 @@ class MapQueryResults {
     void closeResult(int idx) {
         MapQueryResult res = results.get(idx);
 
-        if (res != null && !res.closed()) {
+        if (res != null) {
+            boolean lastClosed = false;
+
             try {
                 // Session isn't set for lazy=false queries.
                 // Also session == null when result already closed.
@@ -160,25 +159,41 @@ class MapQueryResults {
                 res.lockTables();
 
                 synchronized (this) {
-                    res.close();
+                    if (!res.closed()) {
+                        res.close();
+
+                        // The statement of the closed result must not be 
canceled
+                        // because statement & connection may be reused.
+                        cancels[idx] = null;
+
+                        active--;
 
-                    // The statement of the closed result must not be canceled
-                    // because statement & connection may be reused.
-                    cancels[idx] = null;
+                        lastClosed = active == 0;
+                    }
                 }
             }
             finally {
                 res.unlock();
             }
+
+            if (lastClosed)
+                onAllClosed();
         }
     }
 
     /**
-     *
+     * Close map results.
      */
     public void close() {
         for (int i = 0; i < results.length(); i++)
             closeResult(i);
+    }
+
+    /**
+     * All max results closed callback.
+     */
+    private void onAllClosed() {
+        assert active == 0;
 
         if (lazy)
             releaseQueryContext();

Reply via email to