This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8ed8576 IGNITE-12665: SQL: Fix potential race on MapResult close.
8ed8576 is described below
commit 8ed8576544f5c4dbe74fda02de420dfa49aba355
Author: Andrey V. Mashenkov <[email protected]>
AuthorDate: Thu Feb 13 12:35:22 2020 +0300
IGNITE-12665: SQL: Fix potential race on MapResult close.
---
.../query/h2/twostep/GridMapQueryExecutor.java | 109 +++++++++++----------
.../query/h2/twostep/MapQueryResult.java | 31 +++---
.../query/h2/twostep/MapQueryResults.java | 45 ++++++---
3 files changed, 98 insertions(+), 87 deletions(-)
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 222ce26..8b2c157 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -74,6 +74,7 @@ import org.h2.api.ErrorCode;
import org.h2.jdbc.JdbcResultSet;
import org.h2.jdbc.JdbcSQLException;
import org.h2.value.Value;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
@@ -212,29 +213,27 @@ public class GridMapQueryExecutor {
final int segment = i;
ctx.closure().callLocal(
- new Callable<Void>() {
- @Override public Void call() {
- onQueryRequest0(node,
- req.requestId(),
- segment,
- req.schemaName(),
- req.queries(),
- cacheIds,
- req.topologyVersion(),
- partsMap,
- parts,
- req.pageSize(),
- distributedJoins,
- enforceJoinOrder,
- false,
- req.timeout(),
- params,
- lazy,
- req.mvccSnapshot(),
- dataPageScanEnabled);
-
- return null;
- }
+ (Callable<Void>)() -> {
+ onQueryRequest0(node,
+ req.requestId(),
+ segment,
+ req.schemaName(),
+ req.queries(),
+ cacheIds,
+ req.topologyVersion(),
+ partsMap,
+ parts,
+ req.pageSize(),
+ distributedJoins,
+ enforceJoinOrder,
+ false,
+ req.timeout(),
+ params,
+ lazy,
+ req.mvccSnapshot(),
+ dataPageScanEnabled);
+
+ return null;
},
QUERY_POOL);
}
@@ -389,14 +388,10 @@ public class GridMapQueryExecutor {
qryResults.addResult(qryIdx, res);
- ResultSet rs = null;
-
try {
res.lock();
- MapH2QueryInfo qryInfo = null;
-
- // If we are not the target node for this replicated
query, just ignore it.
+ // Ensure we are on the target node for this replicated
query.
if (qry.node() == null || (segmentId == 0 &&
qry.node().equals(ctx.localNodeId()))) {
String sql = qry.query();
Collection<Object> params0 =
F.asList(qry.parameters(params));
@@ -412,9 +407,9 @@ public class GridMapQueryExecutor {
H2Utils.bindParameters(stmt, params0);
- qryInfo = new MapH2QueryInfo(stmt, qry.query(), node,
reqId, segmentId);
+ MapH2QueryInfo qryInfo = new MapH2QueryInfo(stmt,
qry.query(), node, reqId, segmentId);
- rs = h2.executeSqlQueryWithTimer(
+ ResultSet rs = h2.executeSqlQueryWithTimer(
stmt,
connWrp.connection(),
sql,
@@ -441,24 +436,33 @@ public class GridMapQueryExecutor {
}
assert rs instanceof JdbcResultSet : rs.getClass();
- }
- res.openResult(rs);
+ if (qryResults.cancelled()) {
+ rs.close();
- if (qryResults.cancelled())
- throw new QueryCancelledException();
+ throw new QueryCancelledException();
+ }
- final GridQueryNextPageResponse msg = prepareNextPage(
- nodeRess,
- node,
- qryResults,
- qryIdx,
- segmentId,
- pageSize,
- dataPageScanEnabled
- );
+ res.openResult(rs);
+
+ final GridQueryNextPageResponse msg = prepareNextPage(
+ nodeRess,
+ node,
+ qryResults,
+ qryIdx,
+ segmentId,
+ pageSize,
+ dataPageScanEnabled
+ );
+
+ if(msg != null)
+ sendNextPage(node, msg);
+ }
+ else {
+ assert !qry.isPartitioned();
- sendNextPage(node, msg);
+ qryResults.closeResult(qryIdx);
+ }
qryIdx++;
}
@@ -779,7 +783,8 @@ public class GridMapQueryExecutor {
req.pageSize(),
dataPageScanEnabled);
- sendNextPage(node, msg);
+ if(msg != null)
+ sendNextPage(node, msg);
}
finally {
qryCtxRegistry.clearThreadLocal();
@@ -874,14 +879,14 @@ public class GridMapQueryExecutor {
* @param node Node.
* @param msg Message to send.
*/
- private void sendNextPage(ClusterNode node, GridQueryNextPageResponse msg)
{
+ private void sendNextPage(@NotNull ClusterNode node, @NotNull
GridQueryNextPageResponse msg) {
+ assert msg != null;
+
try {
- if (msg != null) {
- if (node.isLocal())
- h2.reduceQueryExecutor().onNextPage(node, msg);
- else
- ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg,
QUERY_POOL);
- }
+ if (node.isLocal())
+ h2.reduceQueryExecutor().onNextPage(node, msg);
+ else
+ ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg,
QUERY_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send message.", e);
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index c14cf7f..cd6a844 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -43,6 +43,7 @@ import org.h2.jdbc.JdbcResultSet;
import org.h2.result.LazyResult;
import org.h2.result.ResultInterface;
import org.h2.value.Value;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
@@ -130,7 +131,7 @@ class MapQueryResult {
}
/** */
- void openResult(ResultSet rs) {
+ void openResult(@NotNull ResultSet rs) {
res = new Result(rs);
}
@@ -337,28 +338,18 @@ class MapQueryResult {
*
* @param rs H2 result set.
*/
- Result(ResultSet rs) {
- if (rs != null) {
- this.rs = rs;
+ Result(@NotNull ResultSet rs) {
+ this.rs = rs;
- try {
- res = (ResultInterface)RESULT_FIELD.get(rs);
- }
- catch (IllegalAccessException e) {
- throw new IllegalStateException(e); // Must not happen.
- }
-
- rowCnt = (res instanceof LazyResult) ? -1 : res.getRowCount();
- cols = res.getVisibleColumnCount();
+ try {
+ res = (ResultInterface)RESULT_FIELD.get(rs);
}
- else {
- this.rs = null;
- this.res = null;
- this.cols = -1;
- this.rowCnt = -1;
-
- closed = true;
+ catch (IllegalAccessException e) {
+ throw new IllegalStateException(e); // Must not happen.
}
+
+ rowCnt = (res instanceof LazyResult) ? -1 : res.getRowCount();
+ cols = res.getVisibleColumnCount();
}
/** */
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
index f42ea61..6ef3d1b 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
@@ -52,6 +52,9 @@ class MapQueryResults {
/** Query context. */
private final QueryContext qctx;
+ /** Active queries. */
+ private int active;
+
/**
* Constructor.
*
@@ -70,6 +73,7 @@ class MapQueryResults {
this.lazy = lazy;
this.qctx = qctx;
+ active = qrys;
results = new AtomicReferenceArray<>(qrys);
cancels = new GridQueryCancel[qrys];
@@ -108,15 +112,8 @@ class MapQueryResults {
/**
* @return {@code true} If all results are closed.
*/
- boolean isAllClosed() {
- for (int i = 0; i < results.length(); i++) {
- MapQueryResult res = results.get(i);
-
- if (res == null || !res.closed())
- return false;
- }
-
- return true;
+ synchronized boolean isAllClosed() {
+ return active == 0;
}
/**
@@ -152,7 +149,9 @@ class MapQueryResults {
void closeResult(int idx) {
MapQueryResult res = results.get(idx);
- if (res != null && !res.closed()) {
+ if (res != null) {
+ boolean lastClosed = false;
+
try {
// Session isn't set for lazy=false queries.
// Also session == null when result already closed.
@@ -160,25 +159,41 @@ class MapQueryResults {
res.lockTables();
synchronized (this) {
- res.close();
+ if (!res.closed()) {
+ res.close();
+
+ // The statement of the closed result must not be
canceled
+ // because statement & connection may be reused.
+ cancels[idx] = null;
+
+ active--;
- // The statement of the closed result must not be canceled
- // because statement & connection may be reused.
- cancels[idx] = null;
+ lastClosed = active == 0;
+ }
}
}
finally {
res.unlock();
}
+
+ if (lastClosed)
+ onAllClosed();
}
}
/**
- *
+ * Close map results.
*/
public void close() {
for (int i = 0; i < results.length(); i++)
closeResult(i);
+ }
+
+ /**
+ * All max results closed callback.
+ */
+ private void onAllClosed() {
+ assert active == 0;
if (lazy)
releaseQueryContext();