Repository: ignite
Updated Branches:
  refs/heads/master a31c0a2e3 -> 7ceecc43f


http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index 90ad604..b48c827 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -23,6 +23,7 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -31,12 +32,12 @@ import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
 import javax.cache.configuration.Factory;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.query.BulkLoadContextCursor;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteVersionUtils;
@@ -54,6 +55,7 @@ import 
org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler;
 import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
 import org.apache.ignite.internal.processors.odbc.ClientListenerResponseSender;
 import 
org.apache.ignite.internal.processors.odbc.odbc.OdbcQueryGetColumnsMetaRequest;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
@@ -76,6 +78,7 @@ import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchR
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_3_0;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_4_0;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_7_0;
+import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_8_0;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC_ORDERED;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BULK_LOAD_BATCH;
@@ -85,6 +88,7 @@ import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_P
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_PRIMARY_KEYS;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_SCHEMAS;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_TABLES;
+import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_CANCEL;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_CLOSE;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_EXEC;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_FETCH;
@@ -94,8 +98,9 @@ import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_ME
  * JDBC request handler.
  */
 public class JdbcRequestHandler implements ClientListenerRequestHandler {
-    /** Query ID sequence. */
-    private static final AtomicLong QRY_ID_GEN = new AtomicLong();
+    /** Jdbc query cancelled response. */
+    private static final JdbcResponse JDBC_QUERY_CANCELLED_RESPONSE =
+        new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED, 
QueryCancelledException.ERR_MSG);
 
     /** Kernel context. */
     private final GridKernalContext ctx;
@@ -115,11 +120,8 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
     /** Maximum allowed cursors. */
     private final int maxCursors;
 
-    /** Current queries cursors. */
-    private final ConcurrentHashMap<Long, JdbcQueryCursor> qryCursors = new 
ConcurrentHashMap<>();
-
-    /** Current bulk load processors. */
-    private final ConcurrentHashMap<Long, JdbcBulkLoadProcessor> 
bulkLoadRequests = new ConcurrentHashMap<>();
+    /** Current JDBC cursors. */
+    private final ConcurrentHashMap<Long, JdbcCursor> jdbcCursors = new 
ConcurrentHashMap<>();
 
     /** Ordered batches queue. */
     private final PriorityQueue<JdbcOrderedBatchExecuteRequest> 
orderedBatchesQueue = new PriorityQueue<>();
@@ -127,6 +129,9 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
     /** Ordered batches mutex. */
     private final Object orderedBatchesMux = new Object();
 
+    /** Request mutex. */
+    private final Object reqMux = new Object();
+
     /** Response sender. */
     private final ClientListenerResponseSender sender;
 
@@ -137,11 +142,14 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
     private final NestedTxMode nestedTxMode;
 
     /** Protocol version. */
-    private ClientListenerProtocolVersion protocolVer;
+    private final ClientListenerProtocolVersion protocolVer;
 
     /** Authentication context */
     private AuthorizationContext actx;
 
+    /** Register that keeps non-cancelled requests. */
+    private Map<Long, JdbcQueryDescriptor> reqRegister = new HashMap<>();
+
     /**
      * Constructor.
      * @param ctx Context.
@@ -218,6 +226,32 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean isCancellationCommand(int cmdId) {
+        return cmdId == JdbcRequest.QRY_CANCEL;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void registerRequest(long reqId, int cmdType) {
+        assert reqId != 0;
+
+        synchronized (reqMux) {
+            if (isCancellationSupported() && (cmdType == QRY_EXEC || cmdType 
== BATCH_EXEC ||
+                cmdType == BATCH_EXEC_ORDERED))
+                reqRegister.put(reqId, new JdbcQueryDescriptor());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void unregisterRequest(long reqId) {
+        assert reqId != 0;
+
+        synchronized (reqMux) {
+            if (isCancellationSupported())
+                reqRegister.remove(reqId);
+        }
+    }
+
     /**
      * Start worker, if it's present.
      */
@@ -279,6 +313,9 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
 
                 case BULK_LOAD_BATCH:
                     return 
processBulkLoadFileBatch((JdbcBulkLoadBatchRequest)req);
+
+                case QRY_CANCEL:
+                    return cancelQuery((JdbcQueryCancelRequest)req);
             }
 
             return new JdbcResponse(IgniteQueryErrorCode.UNSUPPORTED_OPERATION,
@@ -347,11 +384,16 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
      * @return Response to send to the client.
      */
     private ClientListenerResponse 
processBulkLoadFileBatch(JdbcBulkLoadBatchRequest req) {
-        JdbcBulkLoadProcessor processor = bulkLoadRequests.get(req.queryId());
-
         if (ctx == null)
             return new JdbcResponse(IgniteQueryErrorCode.UNEXPECTED_OPERATION, 
"Unknown query ID: "
-                + req.queryId() + ". Bulk load session may have been reclaimed 
due to timeout.");
+                + req.cursorId() + ". Bulk load session may have been 
reclaimed due to timeout.");
+
+        JdbcBulkLoadProcessor processor = 
(JdbcBulkLoadProcessor)jdbcCursors.get(req.cursorId());
+
+        if (!prepareQueryCancellationMeta(processor))
+            return JDBC_QUERY_CANCELLED_RESPONSE;
+
+        boolean unregisterReq = false;
 
         try {
             processor.processBatch(req);
@@ -359,10 +401,12 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
             switch (req.cmd()) {
                 case CMD_FINISHED_ERROR:
                 case CMD_FINISHED_EOF:
-                    bulkLoadRequests.remove(req.queryId());
+                    jdbcCursors.remove(req.cursorId());
 
                     processor.close();
 
+                    unregisterReq = true;
+
                     break;
 
                 case CMD_CONTINUE:
@@ -371,12 +415,18 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
                 default:
                     throw new IllegalArgumentException();
             }
-
-            return new JdbcResponse(new JdbcQueryExecuteResult(req.queryId(), 
processor.updateCnt()));
+            return new JdbcResponse(new JdbcQueryExecuteResult(req.cursorId(), 
processor.updateCnt()));
         }
         catch (Exception e) {
             U.error(null, "Error processing file batch", e);
-            return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Server 
error: " + e);
+
+            if (X.cause(e, QueryCancelledException.class) != null)
+                return exceptionToResult(new QueryCancelledException());
+            else
+                return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Server 
error: " + e);
+        }
+        finally {
+            cleanupQueryCancellationMeta(unregisterReq, processor.requestId());
         }
     }
 
@@ -415,19 +465,14 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
             }
         }
 
-        for (JdbcQueryCursor cursor : qryCursors.values())
-            cursor.close();
+        for (JdbcCursor cursor : jdbcCursors.values())
+            U.close(cursor, log);
 
-        for (JdbcBulkLoadProcessor processor : bulkLoadRequests.values()) {
-            try {
-                processor.close();
-            }
-            catch (Exception e) {
-                U.error(null, "Error closing JDBC bulk load processor.", e);
-            }
-        }
+        jdbcCursors.clear();
 
-        bulkLoadRequests.clear();
+        synchronized (reqMux) {
+            reqRegister.clear();
+        }
 
         U.close(cliCtx, log);
     }
@@ -440,24 +485,40 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
      */
     @SuppressWarnings("unchecked")
     private JdbcResponse executeQuery(JdbcQueryExecuteRequest req) {
-        int cursorCnt = qryCursors.size();
+        GridQueryCancel cancel = null;
+
+        boolean unregisterReq = false;
 
-        if (maxCursors > 0 && cursorCnt >= maxCursors)
-            return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Too many 
open cursors (either close other " +
-                "open cursors or increase the limit through " +
-                "ClientConnectorConfiguration.maxOpenCursorsPerConnection) 
[maximum=" + maxCursors +
-                ", current=" + cursorCnt + ']');
+        if (isCancellationSupported()) {
+            synchronized (reqMux) {
+                JdbcQueryDescriptor desc = reqRegister.get(req.requestId());
 
-        long qryId = QRY_ID_GEN.getAndIncrement();
+                // Query was already cancelled and unregistered.
+                if (desc == null)
+                    return null;
 
-        assert !cliCtx.isStream();
+                cancel = desc.cancelHook();
+
+                desc.incrementUsageCount();
+            }
+        }
 
         try {
+            int cursorCnt = jdbcCursors.size();
+
+            if (maxCursors > 0 && cursorCnt >= maxCursors)
+                return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Too 
many open cursors (either close other " +
+                    "open cursors or increase the limit through " +
+                    "ClientConnectorConfiguration.maxOpenCursorsPerConnection) 
[maximum=" + maxCursors +
+                    ", current=" + cursorCnt + ']');
+
+            assert !cliCtx.isStream();
+
             String sql = req.sqlQuery();
 
             SqlFieldsQueryEx qry;
 
-            switch(req.expectedStatementType()) {
+            switch (req.expectedStatementType()) {
                 case ANY_STATEMENT_TYPE:
                     qry = new SqlFieldsQueryEx(sql, null);
 
@@ -500,29 +561,36 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
             qry.setSchema(schemaName);
 
             List<FieldsQueryCursor<List<?>>> results = 
ctx.query().querySqlFields(null, qry, cliCtx, true,
-                protocolVer.compareTo(VER_2_3_0) < 0);
+                protocolVer.compareTo(VER_2_3_0) < 0, cancel);
 
             FieldsQueryCursor<List<?>> fieldsCur = results.get(0);
 
             if (fieldsCur instanceof BulkLoadContextCursor) {
-                BulkLoadContextCursor blCur = (BulkLoadContextCursor) 
fieldsCur;
+                BulkLoadContextCursor blCur = (BulkLoadContextCursor)fieldsCur;
 
                 BulkLoadProcessor blProcessor = blCur.bulkLoadProcessor();
                 BulkLoadAckClientParameters clientParams = 
blCur.clientParams();
 
-                bulkLoadRequests.put(qryId, new 
JdbcBulkLoadProcessor(blProcessor));
+                JdbcBulkLoadProcessor processor = new 
JdbcBulkLoadProcessor(blProcessor, req.requestId());
 
-                return new JdbcResponse(new JdbcBulkLoadAckResult(qryId, 
clientParams));
+                jdbcCursors.put(processor.cursorId(), processor);
+
+                // responses for the same query on the client side
+                return new JdbcResponse(new 
JdbcBulkLoadAckResult(processor.cursorId(), clientParams));
             }
 
             if (results.size() == 1) {
-                JdbcQueryCursor cur = new JdbcQueryCursor(qryId, 
req.pageSize(), req.maxRows(),
-                    (QueryCursorImpl)fieldsCur);
+                JdbcQueryCursor cur = new JdbcQueryCursor(req.pageSize(), 
req.maxRows(),
+                    (QueryCursorImpl)fieldsCur, req.requestId());
+
+                jdbcCursors.put(cur.cursorId(), cur);
+
+                cur.openIterator();
 
                 JdbcQueryExecuteResult res;
 
                 if (cur.isQuery())
-                    res = new JdbcQueryExecuteResult(qryId, cur.fetchRows(), 
!cur.hasNext());
+                    res = new JdbcQueryExecuteResult(cur.cursorId(), 
cur.fetchRows(), !cur.hasNext());
                 else {
                     List<List<Object>> items = cur.fetchRows();
 
@@ -531,13 +599,16 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
                         "Invalid result set for not-SELECT query. [qry=" + sql 
+
                             ", res=" + S.toString(List.class, items) + ']';
 
-                    res = new JdbcQueryExecuteResult(qryId, 
(Long)items.get(0).get(0));
+                    res = new JdbcQueryExecuteResult(cur.cursorId(), 
(Long)items.get(0).get(0));
                 }
 
-                if (res.last() && (!res.isQuery() || autoCloseCursors))
+                if (res.last() && (!res.isQuery() || autoCloseCursors)) {
+                    jdbcCursors.remove(cur.cursorId());
+
+                    unregisterReq = true;
+
                     cur.close();
-                else
-                    qryCursors.put(qryId, cur);
+                }
 
                 return new JdbcResponse(res);
             }
@@ -552,13 +623,13 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
                     JdbcResultInfo jdbcRes;
 
                     if (qryCur.isQuery()) {
-                        jdbcRes = new JdbcResultInfo(true, -1, qryId);
+                        JdbcQueryCursor cur = new 
JdbcQueryCursor(req.pageSize(), req.maxRows(), qryCur, req.requestId());
 
-                        JdbcQueryCursor cur = new JdbcQueryCursor(qryId, 
req.pageSize(), req.maxRows(), qryCur);
+                        jdbcCursors.put(cur.cursorId(), cur);
 
-                        qryCursors.put(qryId, cur);
+                        jdbcRes = new JdbcResultInfo(true, -1, cur.cursorId());
 
-                        qryId = QRY_ID_GEN.getAndIncrement();
+                        cur.openIterator();
 
                         if (items == null) {
                             items = cur.fetchRows();
@@ -575,11 +646,20 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
             }
         }
         catch (Exception e) {
-            qryCursors.remove(qryId);
+            // Trying to close all cursors of current request.
+            clearCursors(req.requestId());
+
+            unregisterReq = true;
 
             U.error(log, "Failed to execute SQL query [reqId=" + 
req.requestId() + ", req=" + req + ']', e);
 
-            return exceptionToResult(e);
+            if (X.cause(e, QueryCancelledException.class) != null)
+                return exceptionToResult(new QueryCancelledException());
+            else
+                return exceptionToResult(e);
+        }
+        finally {
+            cleanupQueryCancellationMeta(unregisterReq, req.requestId());
         }
     }
 
@@ -590,23 +670,59 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
      * @return Response.
      */
     private JdbcResponse closeQuery(JdbcQueryCloseRequest req) {
+        JdbcCursor cur = jdbcCursors.get(req.cursorId());
+
+        if (!prepareQueryCancellationMeta(cur))
+            return new JdbcResponse(null);
+
         try {
-            JdbcQueryCursor cur = qryCursors.remove(req.queryId());
+            cur = jdbcCursors.remove(req.cursorId());
 
             if (cur == null)
                 return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN,
-                    "Failed to find query cursor with ID: " + req.queryId());
+                    "Failed to find query cursor with ID: " + req.cursorId());
 
             cur.close();
 
             return new JdbcResponse(null);
         }
         catch (Exception e) {
-            qryCursors.remove(req.queryId());
+            jdbcCursors.remove(req.cursorId());
 
-            U.error(log, "Failed to close SQL query [reqId=" + req.requestId() 
+ ", req=" + req.queryId() + ']', e);
+            U.error(log, "Failed to close SQL query [reqId=" + req.requestId() 
+ ", req=" + req + ']', e);
 
-            return exceptionToResult(e);
+            if (X.cause(e, QueryCancelledException.class) != null)
+                return new JdbcResponse(null);
+            else
+                return exceptionToResult(e);
+        }
+        finally {
+            if (isCancellationSupported()) {
+                boolean clearCursors = false;
+
+                synchronized (reqMux) {
+                    assert cur != null;
+
+                    JdbcQueryDescriptor desc = 
reqRegister.get(cur.requestId());
+
+                    if (desc != null) {
+                        // Query was cancelled during execution.
+                        if (desc.isCanceled()) {
+                            clearCursors = true;
+
+                            unregisterRequest(req.requestId());
+                        }
+                        else {
+                            tryUnregisterRequest(cur.requestId());
+
+                            desc.decrementUsageCount();
+                        }
+                    }
+                }
+
+                if (clearCursors)
+                    clearCursors(cur.requestId());
+            }
         }
     }
 
@@ -617,12 +733,17 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
      * @return Response.
      */
     private JdbcResponse fetchQuery(JdbcQueryFetchRequest req) {
-        try {
-            JdbcQueryCursor cur = qryCursors.get(req.queryId());
+        final JdbcQueryCursor cur = 
(JdbcQueryCursor)jdbcCursors.get(req.cursorId());
+
+        if (!prepareQueryCancellationMeta(cur))
+            return JDBC_QUERY_CANCELLED_RESPONSE;
 
+        boolean unregisterReq = false;
+
+        try {
             if (cur == null)
                 return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN,
-                    "Failed to find query cursor with ID: " + req.queryId());
+                    "Failed to find query cursor with ID: " + req.cursorId());
 
             if (req.pageSize() <= 0)
                 return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN,
@@ -633,7 +754,9 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
             JdbcQueryFetchResult res = new 
JdbcQueryFetchResult(cur.fetchRows(), !cur.hasNext());
 
             if (res.last() && (!cur.isQuery() || autoCloseCursors)) {
-                qryCursors.remove(req.queryId());
+                jdbcCursors.remove(req.cursorId());
+
+                unregisterReq = true;
 
                 cur.close();
             }
@@ -643,7 +766,15 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
         catch (Exception e) {
             U.error(log, "Failed to fetch SQL query result [reqId=" + 
req.requestId() + ", req=" + req + ']', e);
 
-            return exceptionToResult(e);
+            if (X.cause(e, QueryCancelledException.class) != null)
+                return exceptionToResult(new QueryCancelledException());
+            else
+                return exceptionToResult(e);
+        }
+        finally {
+            assert cur != null;
+
+            cleanupQueryCancellationMeta(unregisterReq, cur.requestId());
         }
     }
 
@@ -652,14 +783,17 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
      * @return Response.
      */
     private JdbcResponse getQueryMeta(JdbcQueryMetadataRequest req) {
-        try {
-            JdbcQueryCursor cur = qryCursors.get(req.queryId());
+        final JdbcQueryCursor cur = 
(JdbcQueryCursor)jdbcCursors.get(req.cursorId());
+
+        if (!prepareQueryCancellationMeta(cur))
+            return JDBC_QUERY_CANCELLED_RESPONSE;
 
+        try {
             if (cur == null)
                 return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN,
-                    "Failed to find query with ID: " + req.queryId());
+                    "Failed to find query cursor with ID: " + req.cursorId());
 
-            JdbcQueryMetadataResult res = new 
JdbcQueryMetadataResult(req.queryId(),
+            JdbcQueryMetadataResult res = new 
JdbcQueryMetadataResult(req.cursorId(),
                 cur.meta());
 
             return new JdbcResponse(res);
@@ -669,6 +803,11 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
 
             return exceptionToResult(e);
         }
+        finally {
+            assert cur != null;
+
+            cleanupQueryCancellationMeta(false, cur.requestId());
+        }
     }
 
     /**
@@ -676,55 +815,80 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
      * @return Response.
      */
     private ClientListenerResponse executeBatch(JdbcBatchExecuteRequest req) {
-        String schemaName = req.schemaName();
+        GridQueryCancel cancel = null;
 
-        if (F.isEmpty(schemaName))
-            schemaName = QueryUtils.DFLT_SCHEMA;
+        if (isCancellationSupported()) {
+            synchronized (reqMux) {
+                JdbcQueryDescriptor desc = reqRegister.get(req.requestId());
 
-        int qryCnt = req.queries().size();
+                // Query was already cancelled and unregisterd.
+                if (desc == null)
+                    return null;
 
-        List<Integer> updCntsAcc = new ArrayList<>(qryCnt);
+                cancel = desc.cancelHook();
 
-        // Send back only the first error. Others will be written to the log.
-        IgniteBiTuple<Integer, String> firstErr = new IgniteBiTuple<>();
+                desc.incrementUsageCount();
+            }
+        }
 
-        SqlFieldsQueryEx qry = null;
+        try {
+            String schemaName = req.schemaName();
 
-        for (JdbcQuery q : req.queries()) {
-            if (q.sql() != null) { // If we have a new query string in the 
batch,
-                if (qry != null) // then execute the previous sub-batch and 
create a new SqlFieldsQueryEx.
-                    executeBatchedQuery(qry, updCntsAcc, firstErr);
+            if (F.isEmpty(schemaName))
+                schemaName = QueryUtils.DFLT_SCHEMA;
 
-                qry = new SqlFieldsQueryEx(q.sql(), false);
+            int qryCnt = req.queries().size();
 
-                qry.setDistributedJoins(cliCtx.isDistributedJoins());
-                qry.setEnforceJoinOrder(cliCtx.isEnforceJoinOrder());
-                qry.setCollocated(cliCtx.isCollocated());
-                qry.setReplicatedOnly(cliCtx.isReplicatedOnly());
-                qry.setLazy(cliCtx.isLazy());
-                qry.setNestedTxMode(nestedTxMode);
-                qry.setAutoCommit(req.autoCommit());
+            List<Integer> updCntsAcc = new ArrayList<>(qryCnt);
 
-                qry.setSchema(schemaName);
-            }
+            // Send back only the first error. Others will be written to the 
log.
+            IgniteBiTuple<Integer, String> firstErr = new IgniteBiTuple<>();
 
-            assert qry != null;
+            SqlFieldsQueryEx qry = null;
 
-            qry.addBatchedArgs(q.args());
-        }
+            for (JdbcQuery q : req.queries()) {
+                if (q.sql() != null) { // If we have a new query string in the 
batch,
+                    if (qry != null) // then execute the previous sub-batch 
and create a new SqlFieldsQueryEx.
+                        executeBatchedQuery(qry, updCntsAcc, firstErr, cancel);
 
-        if (qry != null)
-            executeBatchedQuery(qry, updCntsAcc, firstErr);
+                    qry = new SqlFieldsQueryEx(q.sql(), false);
 
-        if (req.isLastStreamBatch())
-            cliCtx.disableStreaming();
+                    qry.setDistributedJoins(cliCtx.isDistributedJoins());
+                    qry.setEnforceJoinOrder(cliCtx.isEnforceJoinOrder());
+                    qry.setCollocated(cliCtx.isCollocated());
+                    qry.setReplicatedOnly(cliCtx.isReplicatedOnly());
+                    qry.setLazy(cliCtx.isLazy());
+                    qry.setNestedTxMode(nestedTxMode);
+                    qry.setAutoCommit(req.autoCommit());
 
-        int updCnts[] = U.toIntArray(updCntsAcc);
+                    qry.setSchema(schemaName);
+                }
 
-        if (firstErr.isEmpty())
-            return new JdbcResponse(new JdbcBatchExecuteResult(updCnts, 
ClientListenerResponse.STATUS_SUCCESS, null));
-        else
-            return new JdbcResponse(new JdbcBatchExecuteResult(updCnts, 
firstErr.getKey(), firstErr.getValue()));
+                assert qry != null;
+
+                qry.addBatchedArgs(q.args());
+            }
+
+            if (qry != null)
+                executeBatchedQuery(qry, updCntsAcc, firstErr, cancel);
+
+            if (req.isLastStreamBatch())
+                cliCtx.disableStreaming();
+
+            int updCnts[] = U.toIntArray(updCntsAcc);
+
+            if (firstErr.isEmpty())
+                return new JdbcResponse(new JdbcBatchExecuteResult(updCnts, 
ClientListenerResponse.STATUS_SUCCESS,
+                    null));
+            else
+                return new JdbcResponse(new JdbcBatchExecuteResult(updCnts, 
firstErr.getKey(), firstErr.getValue()));
+        }
+        catch (QueryCancelledException e) {
+            return exceptionToResult(e);
+        }
+        finally {
+            cleanupQueryCancellationMeta(true, req.requestId());
+        }
     }
 
     /**
@@ -733,10 +897,12 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
      * @param qry Query.
      * @param updCntsAcc Per query rows updates counter.
      * @param firstErr First error data - code and message.
+     * @param cancel Hook for query cancellation.
+     * @throws QueryCancelledException If query was cancelled during execution.
      */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
+    @SuppressWarnings({"ForLoopReplaceableByForEach", "unchecked"})
     private void executeBatchedQuery(SqlFieldsQueryEx qry, List<Integer> 
updCntsAcc,
-        IgniteBiTuple<Integer, String> firstErr) {
+        IgniteBiTuple<Integer, String> firstErr, GridQueryCancel cancel) 
throws QueryCancelledException {
         try {
             if (cliCtx.isStream()) {
                 List<Long> cnt = 
ctx.query().streamBatchedUpdateQuery(qry.getSchema(), cliCtx, qry.getSql(),
@@ -748,7 +914,7 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
                 return;
             }
 
-            List<FieldsQueryCursor<List<?>>> qryRes = 
ctx.query().querySqlFields(null, qry, cliCtx, true, true);
+            List<FieldsQueryCursor<List<?>>> qryRes = 
ctx.query().querySqlFields(null, qry, cliCtx, true, true, cancel);
 
             for (FieldsQueryCursor<List<?>> cur : qryRes) {
                 if (cur instanceof BulkLoadContextCursor)
@@ -770,7 +936,9 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
 
             String msg;
 
-            if (e instanceof IgniteSQLException) {
+            if (X.cause(e, QueryCancelledException.class) != null)
+                throw new QueryCancelledException();
+            else if (e instanceof IgniteSQLException) {
                 BatchUpdateException batchCause = X.cause(e, 
BatchUpdateException.class);
 
                 if (batchCause != null) {
@@ -804,7 +972,7 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
             if (firstErr.isEmpty())
                 firstErr.set(code, msg);
             else
-                U.error(log, "Failed to execute batch query [qry=" + qry +']', 
e);
+                U.error(log, "Failed to execute batch query [qry=" + qry + 
']', e);
         }
     }
 
@@ -997,7 +1165,6 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
                             fields.add(field);
                     }
 
-
                     final String keyName = table.keyFieldName() == null ?
                         "PK_" + table.schemaName() + "_" + table.tableName() :
                         table.keyFieldName();
@@ -1069,8 +1236,10 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
      * @return resulting {@link JdbcResponse}.
      */
     private JdbcResponse exceptionToResult(Exception e) {
+        if (e instanceof QueryCancelledException)
+            return new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED, 
e.getMessage());
         if (e instanceof IgniteSQLException)
-            return new JdbcResponse(((IgniteSQLException) e).statusCode(), 
e.getMessage());
+            return new JdbcResponse(((IgniteSQLException)e).statusCode(), 
e.getMessage());
         else
             return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, 
e.getMessage());
     }
@@ -1112,4 +1281,163 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
             }
         }
     }
+
+    /**
+     * Cancels query with specified request id;
+     *
+     * @param req Query cancellation request;
+     * @return <code>QueryCancelledException</code> wrapped with 
<code>JdbcResponse</code>
+     */
+    private JdbcResponse cancelQuery(JdbcQueryCancelRequest req) {
+        boolean clearCursors = false;
+
+        GridQueryCancel cancelHook;
+
+        synchronized (reqMux) {
+            JdbcQueryDescriptor desc = 
reqRegister.get(req.requestIdToBeCancelled());
+
+            // Query was already executed.
+            if (desc == null)
+                return null;
+
+            // Query was registered, however execution didn't start yet.
+            else if (!desc.isExecutionStarted()) {
+                unregisterRequest(req.requestId());
+
+                return exceptionToResult(new QueryCancelledException());
+            }
+            else {
+                cancelHook = desc.cancelHook();
+
+                desc.markCancelled();
+
+                if (desc.usageCount() == 0) {
+                    clearCursors = true;
+
+                    unregisterRequest(req.requestIdToBeCancelled());
+                }
+            }
+        }
+
+        cancelHook.cancel();
+
+        if (clearCursors)
+            clearCursors(req.requestIdToBeCancelled());
+
+        return null;
+    }
+
+    /**
+     * Checks whether query cancellation is supported whithin given version of 
protocal.
+     *
+     * @return True if supported, false otherwise.
+     */
+    private boolean isCancellationSupported() {
+        return (protocolVer.compareTo(VER_2_8_0) >= 0);
+    }
+
+    /**
+     * Unregisters request if there are no cursors binded to it.
+     *
+     * @param reqId Reuest to unregist.
+     */
+    private void tryUnregisterRequest(long reqId) {
+        assert isCancellationSupported();
+
+        boolean unregisterReq = true;
+
+        for (JdbcCursor cursor : jdbcCursors.values()) {
+            if (cursor.requestId() == reqId) {
+                unregisterReq = false;
+
+                break;
+            }
+        }
+
+        if (unregisterReq)
+            unregisterRequest(reqId);
+    }
+
+    /**
+     * Tries to close all cursors of request with given id and removes them 
from jdbcCursors map.
+     *
+     * @param reqId Request ID.
+     */
+    private void clearCursors(long reqId) {
+        for (Iterator<Map.Entry<Long, JdbcCursor>> it = 
jdbcCursors.entrySet().iterator(); it.hasNext(); ) {
+            Map.Entry<Long, JdbcCursor> entry = it.next();
+
+            JdbcCursor cursor = entry.getValue();
+
+            if (cursor.requestId() == reqId) {
+                try {
+                    cursor.close();
+                }
+                catch (Exception e) {
+                    U.error(log, "Failed to close cursor [reqId=" + reqId + ", 
cursor=" + cursor + ']', e);
+                }
+
+                it.remove();
+            }
+        }
+    }
+
+    /**
+     * Checks whether query was cancelled - returns null if true, otherwise 
increments query descriptor usage count.
+     *
+     * @param cur Jdbc Cursor.
+     * @return False, if query was already cancelled.
+     */
+    private boolean prepareQueryCancellationMeta(JdbcCursor cur) {
+        if (isCancellationSupported()) {
+            // Nothing to do - cursor was already removed.
+            if (cur == null)
+                return false;
+
+            synchronized (reqMux) {
+                JdbcQueryDescriptor desc = reqRegister.get(cur.requestId());
+
+                // Query was already cancelled and unregisterd.
+                if (desc == null)
+                    return false;
+
+                desc.incrementUsageCount();
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Cleanups cursors or processors and unregistered request if necessary.
+     *
+     * @param unregisterReq Flag, that detecs whether it's necessary to 
unregister request.
+     * @param reqId Request Id.
+     */
+    private void cleanupQueryCancellationMeta(boolean unregisterReq, long 
reqId) {
+        if (isCancellationSupported()) {
+            boolean clearCursors = false;
+
+            synchronized (reqMux) {
+                JdbcQueryDescriptor desc = reqRegister.get(reqId);
+
+                if (desc != null) {
+                    // Query was cancelled during execution.
+                    if (desc.isCanceled()) {
+                        clearCursors = true;
+
+                        unregisterReq = true;
+                    }
+                    else
+                        desc.decrementUsageCount();
+
+                    if (unregisterReq)
+                        unregisterRequest(reqId);
+                }
+            }
+
+            if (clearCursors)
+                clearCursors(reqId);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java
index 5fab77a..ceb7964 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java
@@ -33,8 +33,8 @@ public class JdbcResultInfo implements JdbcRawBinarylizable {
     /** Update count. */
     private long updCnt;
 
-    /** Query ID. */
-    private long qryId;
+    /** Cursor ID. */
+    private long cursorId;
 
     /**
      * Default constructor is used for serialization.
@@ -46,12 +46,12 @@ public class JdbcResultInfo implements JdbcRawBinarylizable 
{
     /**
      * @param isQuery Query flag.
      * @param updCnt Update count.
-     * @param qryId  Query ID.
+     * @param cursorId  Cursor ID.
      */
-    public JdbcResultInfo(boolean isQuery, long updCnt, long qryId) {
+    public JdbcResultInfo(boolean isQuery, long updCnt, long cursorId) {
         this.isQuery = isQuery;
         this.updCnt = updCnt;
-        this.qryId = qryId;
+        this.cursorId = cursorId;
     }
 
     /**
@@ -62,10 +62,10 @@ public class JdbcResultInfo implements JdbcRawBinarylizable 
{
     }
 
     /**
-     * @return Query ID.
+     * @return Cursor ID.
      */
-    public long queryId() {
-        return qryId;
+    public long cursorId() {
+        return cursorId;
     }
 
     /**
@@ -80,7 +80,7 @@ public class JdbcResultInfo implements JdbcRawBinarylizable {
         ClientListenerProtocolVersion ver) {
         writer.writeBoolean(isQuery);
         writer.writeLong(updCnt);
-        writer.writeLong(qryId);
+        writer.writeLong(cursorId);
     }
 
     /** {@inheritDoc} */
@@ -88,7 +88,7 @@ public class JdbcResultInfo implements JdbcRawBinarylizable {
         ClientListenerProtocolVersion ver) {
         isQuery = reader.readBoolean();
         updCnt = reader.readLong();
-        qryId = reader.readLong();
+        cursorId = reader.readLong();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
index cf11270..5c63a4a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.odbc.odbc;
 
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
@@ -26,15 +28,12 @@ import 
org.apache.ignite.internal.processors.odbc.ClientListenerAbstractConnecti
 import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser;
 import 
org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
 import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler;
-import org.apache.ignite.internal.processors.query.NestedTxMode;
 import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
 import org.apache.ignite.internal.processors.odbc.ClientListenerResponseSender;
+import org.apache.ignite.internal.processors.query.NestedTxMode;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 
-import java.util.HashSet;
-import java.util.Set;
-
 /**
  * ODBC Connection Context.
  */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
index 0e9b48a..0e66c93 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
@@ -409,6 +409,19 @@ public class OdbcMessageParser implements 
ClientListenerMessageParser {
         return writer.array();
     }
 
+    /** {@inheritDoc} */
+    @Override public int decodeCommandType(byte[] msg) {
+        assert msg != null;
+
+        return msg[0];
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public long decodeRequestId(byte[] msg) {
+        return 0;
+    }
+
     /**
      * @param writer Writer to use.
      * @param affectedRows Affected rows.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
index 10cfd6d..edb8a31 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
@@ -300,6 +300,21 @@ public class OdbcRequestHandler implements 
ClientListenerRequestHandler {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean isCancellationCommand(int cmdId) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void registerRequest(long reqId, int cmdType) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void unregisterRequest(long reqId) {
+        // No-op.
+    }
+
     /**
      * Make query considering handler configuration.
      * @param schema Schema.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index c65e64a..0e81c20 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -388,4 +388,18 @@ public class ClientMessageParser implements 
ClientListenerMessageParser {
 
         return outStream.arrayCopy();
     }
+
+    /** {@inheritDoc} */
+    @Override public int decodeCommandType(byte[] msg) {
+        assert msg != null;
+
+        BinaryInputStream inStream = new BinaryHeapInputStream(msg);
+
+        return inStream.readShort();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long decodeRequestId(byte[] msg) {
+        return 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java
index 8fe4e5d..1b59c48 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java
@@ -75,4 +75,20 @@ public class ClientRequestHandler implements 
ClientListenerRequestHandler {
     @Override public void writeHandshake(BinaryWriterExImpl writer) {
         writer.writeBoolean(true);
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean isCancellationCommand(int cmdId) {
+        return false;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public void registerRequest(long reqId, int cmdType) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void unregisterRequest(long reqId) {
+        // No-op.
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index c1edbbb..94ad17d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -2127,7 +2127,39 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
             cliCtx,
             keepBinary,
             failOnMultipleStmts,
-            GridCacheQueryType.SQL_FIELDS
+            GridCacheQueryType.SQL_FIELDS,
+            null
+        );
+    }
+
+    /**
+     * Query SQL fields.
+     *
+     * @param cctx Cache context.
+     * @param qry Query.
+     * @param cliCtx Client context.
+     * @param keepBinary Keep binary flag.
+     * @param failOnMultipleStmts If {@code true} the method must throws 
exception when query contains
+     *      more then one SQL statement.
+     * @param cancel Hook for query cancellation.
+     * @return Cursor.
+     */
+    public List<FieldsQueryCursor<List<?>>> querySqlFields(
+        @Nullable final GridCacheContext<?, ?> cctx,
+        final SqlFieldsQuery qry,
+        final SqlClientContext cliCtx,
+        final boolean keepBinary,
+        final boolean failOnMultipleStmts,
+        @Nullable final GridQueryCancel cancel
+    ) {
+        return querySqlFields(
+            cctx,
+            qry,
+            cliCtx,
+            keepBinary,
+            failOnMultipleStmts,
+            GridCacheQueryType.SQL_FIELDS,
+            cancel
         );
     }
 
@@ -2141,6 +2173,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
      * @param failOnMultipleStmts If {@code true} the method must throws 
exception when query contains
      *      more then one SQL statement.
      * @param qryType Real query type.
+     * @param cancel Hook for query cancellation.
      * @return Cursor.
      */
     public List<FieldsQueryCursor<List<?>>> querySqlFields(
@@ -2149,7 +2182,8 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
         final SqlClientContext cliCtx,
         final boolean keepBinary,
         final boolean failOnMultipleStmts,
-        GridCacheQueryType qryType
+        GridCacheQueryType qryType,
+        @Nullable final GridQueryCancel cancel
     ) {
         // Validate.
         checkxEnabled();
@@ -2167,10 +2201,19 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
             IgniteOutClosureX<List<FieldsQueryCursor<List<?>>>> clo =
                 new IgniteOutClosureX<List<FieldsQueryCursor<List<?>>>>() {
                     @Override public List<FieldsQueryCursor<List<?>>> applyx() 
{
-                        GridQueryCancel cancel = new GridQueryCancel();
-
-                        List<FieldsQueryCursor<List<?>>> res = 
idx.querySqlFields(schemaName, qry, cliCtx,
-                            keepBinary, failOnMultipleStmts, null, cancel, 
true);
+                        GridQueryCancel cancel0 = cancel != null ? cancel : 
new GridQueryCancel();
+
+                        List<FieldsQueryCursor<List<?>>> res =
+                            idx.querySqlFields(
+                                schemaName,
+                                qry,
+                                cliCtx,
+                                keepBinary,
+                                failOnMultipleStmts,
+                                null,
+                                cancel0,
+                                true
+                            );
 
                         if (cctx != null)
                             sendQueryExecutedEvent(qry.getSql(), 
qry.getArgs(), cctx, qryType);
@@ -2298,7 +2341,8 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
             null,
             keepBinary,
             true,
-            GridCacheQueryType.SQL
+            GridCacheQueryType.SQL,
+            null
         ).get(0);
 
         // Convert.

Reply via email to