Repository: ignite Updated Branches: refs/heads/master a31c0a2e3 -> 7ceecc43f
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java index 90ad604..b48c827 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java @@ -23,6 +23,7 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; @@ -31,12 +32,12 @@ import java.util.Map; import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; import javax.cache.configuration.Factory; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.query.BulkLoadContextCursor; import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteVersionUtils; @@ -54,6 +55,7 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler; import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; import org.apache.ignite.internal.processors.odbc.ClientListenerResponseSender; import org.apache.ignite.internal.processors.odbc.odbc.OdbcQueryGetColumnsMetaRequest; +import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor; import org.apache.ignite.internal.processors.query.GridQueryProperty; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; @@ -76,6 +78,7 @@ import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchR import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_3_0; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_4_0; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_7_0; +import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_8_0; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC_ORDERED; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BULK_LOAD_BATCH; @@ -85,6 +88,7 @@ import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_P import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_PRIMARY_KEYS; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_SCHEMAS; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_TABLES; +import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_CANCEL; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_CLOSE; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_EXEC; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_FETCH; @@ -94,8 +98,9 @@ import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_ME * JDBC request handler. */ public class JdbcRequestHandler implements ClientListenerRequestHandler { - /** Query ID sequence. */ - private static final AtomicLong QRY_ID_GEN = new AtomicLong(); + /** Jdbc query cancelled response. */ + private static final JdbcResponse JDBC_QUERY_CANCELLED_RESPONSE = + new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED, QueryCancelledException.ERR_MSG); /** Kernel context. */ private final GridKernalContext ctx; @@ -115,11 +120,8 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { /** Maximum allowed cursors. */ private final int maxCursors; - /** Current queries cursors. */ - private final ConcurrentHashMap<Long, JdbcQueryCursor> qryCursors = new ConcurrentHashMap<>(); - - /** Current bulk load processors. */ - private final ConcurrentHashMap<Long, JdbcBulkLoadProcessor> bulkLoadRequests = new ConcurrentHashMap<>(); + /** Current JDBC cursors. */ + private final ConcurrentHashMap<Long, JdbcCursor> jdbcCursors = new ConcurrentHashMap<>(); /** Ordered batches queue. */ private final PriorityQueue<JdbcOrderedBatchExecuteRequest> orderedBatchesQueue = new PriorityQueue<>(); @@ -127,6 +129,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { /** Ordered batches mutex. */ private final Object orderedBatchesMux = new Object(); + /** Request mutex. */ + private final Object reqMux = new Object(); + /** Response sender. */ private final ClientListenerResponseSender sender; @@ -137,11 +142,14 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { private final NestedTxMode nestedTxMode; /** Protocol version. */ - private ClientListenerProtocolVersion protocolVer; + private final ClientListenerProtocolVersion protocolVer; /** Authentication context */ private AuthorizationContext actx; + /** Register that keeps non-cancelled requests. */ + private Map<Long, JdbcQueryDescriptor> reqRegister = new HashMap<>(); + /** * Constructor. * @param ctx Context. @@ -218,6 +226,32 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { } } + /** {@inheritDoc} */ + @Override public boolean isCancellationCommand(int cmdId) { + return cmdId == JdbcRequest.QRY_CANCEL; + } + + /** {@inheritDoc} */ + @Override public void registerRequest(long reqId, int cmdType) { + assert reqId != 0; + + synchronized (reqMux) { + if (isCancellationSupported() && (cmdType == QRY_EXEC || cmdType == BATCH_EXEC || + cmdType == BATCH_EXEC_ORDERED)) + reqRegister.put(reqId, new JdbcQueryDescriptor()); + } + } + + /** {@inheritDoc} */ + @Override public void unregisterRequest(long reqId) { + assert reqId != 0; + + synchronized (reqMux) { + if (isCancellationSupported()) + reqRegister.remove(reqId); + } + } + /** * Start worker, if it's present. */ @@ -279,6 +313,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { case BULK_LOAD_BATCH: return processBulkLoadFileBatch((JdbcBulkLoadBatchRequest)req); + + case QRY_CANCEL: + return cancelQuery((JdbcQueryCancelRequest)req); } return new JdbcResponse(IgniteQueryErrorCode.UNSUPPORTED_OPERATION, @@ -347,11 +384,16 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { * @return Response to send to the client. */ private ClientListenerResponse processBulkLoadFileBatch(JdbcBulkLoadBatchRequest req) { - JdbcBulkLoadProcessor processor = bulkLoadRequests.get(req.queryId()); - if (ctx == null) return new JdbcResponse(IgniteQueryErrorCode.UNEXPECTED_OPERATION, "Unknown query ID: " - + req.queryId() + ". Bulk load session may have been reclaimed due to timeout."); + + req.cursorId() + ". Bulk load session may have been reclaimed due to timeout."); + + JdbcBulkLoadProcessor processor = (JdbcBulkLoadProcessor)jdbcCursors.get(req.cursorId()); + + if (!prepareQueryCancellationMeta(processor)) + return JDBC_QUERY_CANCELLED_RESPONSE; + + boolean unregisterReq = false; try { processor.processBatch(req); @@ -359,10 +401,12 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { switch (req.cmd()) { case CMD_FINISHED_ERROR: case CMD_FINISHED_EOF: - bulkLoadRequests.remove(req.queryId()); + jdbcCursors.remove(req.cursorId()); processor.close(); + unregisterReq = true; + break; case CMD_CONTINUE: @@ -371,12 +415,18 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { default: throw new IllegalArgumentException(); } - - return new JdbcResponse(new JdbcQueryExecuteResult(req.queryId(), processor.updateCnt())); + return new JdbcResponse(new JdbcQueryExecuteResult(req.cursorId(), processor.updateCnt())); } catch (Exception e) { U.error(null, "Error processing file batch", e); - return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Server error: " + e); + + if (X.cause(e, QueryCancelledException.class) != null) + return exceptionToResult(new QueryCancelledException()); + else + return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Server error: " + e); + } + finally { + cleanupQueryCancellationMeta(unregisterReq, processor.requestId()); } } @@ -415,19 +465,14 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { } } - for (JdbcQueryCursor cursor : qryCursors.values()) - cursor.close(); + for (JdbcCursor cursor : jdbcCursors.values()) + U.close(cursor, log); - for (JdbcBulkLoadProcessor processor : bulkLoadRequests.values()) { - try { - processor.close(); - } - catch (Exception e) { - U.error(null, "Error closing JDBC bulk load processor.", e); - } - } + jdbcCursors.clear(); - bulkLoadRequests.clear(); + synchronized (reqMux) { + reqRegister.clear(); + } U.close(cliCtx, log); } @@ -440,24 +485,40 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { */ @SuppressWarnings("unchecked") private JdbcResponse executeQuery(JdbcQueryExecuteRequest req) { - int cursorCnt = qryCursors.size(); + GridQueryCancel cancel = null; + + boolean unregisterReq = false; - if (maxCursors > 0 && cursorCnt >= maxCursors) - return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Too many open cursors (either close other " + - "open cursors or increase the limit through " + - "ClientConnectorConfiguration.maxOpenCursorsPerConnection) [maximum=" + maxCursors + - ", current=" + cursorCnt + ']'); + if (isCancellationSupported()) { + synchronized (reqMux) { + JdbcQueryDescriptor desc = reqRegister.get(req.requestId()); - long qryId = QRY_ID_GEN.getAndIncrement(); + // Query was already cancelled and unregistered. + if (desc == null) + return null; - assert !cliCtx.isStream(); + cancel = desc.cancelHook(); + + desc.incrementUsageCount(); + } + } try { + int cursorCnt = jdbcCursors.size(); + + if (maxCursors > 0 && cursorCnt >= maxCursors) + return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Too many open cursors (either close other " + + "open cursors or increase the limit through " + + "ClientConnectorConfiguration.maxOpenCursorsPerConnection) [maximum=" + maxCursors + + ", current=" + cursorCnt + ']'); + + assert !cliCtx.isStream(); + String sql = req.sqlQuery(); SqlFieldsQueryEx qry; - switch(req.expectedStatementType()) { + switch (req.expectedStatementType()) { case ANY_STATEMENT_TYPE: qry = new SqlFieldsQueryEx(sql, null); @@ -500,29 +561,36 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { qry.setSchema(schemaName); List<FieldsQueryCursor<List<?>>> results = ctx.query().querySqlFields(null, qry, cliCtx, true, - protocolVer.compareTo(VER_2_3_0) < 0); + protocolVer.compareTo(VER_2_3_0) < 0, cancel); FieldsQueryCursor<List<?>> fieldsCur = results.get(0); if (fieldsCur instanceof BulkLoadContextCursor) { - BulkLoadContextCursor blCur = (BulkLoadContextCursor) fieldsCur; + BulkLoadContextCursor blCur = (BulkLoadContextCursor)fieldsCur; BulkLoadProcessor blProcessor = blCur.bulkLoadProcessor(); BulkLoadAckClientParameters clientParams = blCur.clientParams(); - bulkLoadRequests.put(qryId, new JdbcBulkLoadProcessor(blProcessor)); + JdbcBulkLoadProcessor processor = new JdbcBulkLoadProcessor(blProcessor, req.requestId()); - return new JdbcResponse(new JdbcBulkLoadAckResult(qryId, clientParams)); + jdbcCursors.put(processor.cursorId(), processor); + + // responses for the same query on the client side + return new JdbcResponse(new JdbcBulkLoadAckResult(processor.cursorId(), clientParams)); } if (results.size() == 1) { - JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(), - (QueryCursorImpl)fieldsCur); + JdbcQueryCursor cur = new JdbcQueryCursor(req.pageSize(), req.maxRows(), + (QueryCursorImpl)fieldsCur, req.requestId()); + + jdbcCursors.put(cur.cursorId(), cur); + + cur.openIterator(); JdbcQueryExecuteResult res; if (cur.isQuery()) - res = new JdbcQueryExecuteResult(qryId, cur.fetchRows(), !cur.hasNext()); + res = new JdbcQueryExecuteResult(cur.cursorId(), cur.fetchRows(), !cur.hasNext()); else { List<List<Object>> items = cur.fetchRows(); @@ -531,13 +599,16 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { "Invalid result set for not-SELECT query. [qry=" + sql + ", res=" + S.toString(List.class, items) + ']'; - res = new JdbcQueryExecuteResult(qryId, (Long)items.get(0).get(0)); + res = new JdbcQueryExecuteResult(cur.cursorId(), (Long)items.get(0).get(0)); } - if (res.last() && (!res.isQuery() || autoCloseCursors)) + if (res.last() && (!res.isQuery() || autoCloseCursors)) { + jdbcCursors.remove(cur.cursorId()); + + unregisterReq = true; + cur.close(); - else - qryCursors.put(qryId, cur); + } return new JdbcResponse(res); } @@ -552,13 +623,13 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { JdbcResultInfo jdbcRes; if (qryCur.isQuery()) { - jdbcRes = new JdbcResultInfo(true, -1, qryId); + JdbcQueryCursor cur = new JdbcQueryCursor(req.pageSize(), req.maxRows(), qryCur, req.requestId()); - JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(), qryCur); + jdbcCursors.put(cur.cursorId(), cur); - qryCursors.put(qryId, cur); + jdbcRes = new JdbcResultInfo(true, -1, cur.cursorId()); - qryId = QRY_ID_GEN.getAndIncrement(); + cur.openIterator(); if (items == null) { items = cur.fetchRows(); @@ -575,11 +646,20 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { } } catch (Exception e) { - qryCursors.remove(qryId); + // Trying to close all cursors of current request. + clearCursors(req.requestId()); + + unregisterReq = true; U.error(log, "Failed to execute SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e); - return exceptionToResult(e); + if (X.cause(e, QueryCancelledException.class) != null) + return exceptionToResult(new QueryCancelledException()); + else + return exceptionToResult(e); + } + finally { + cleanupQueryCancellationMeta(unregisterReq, req.requestId()); } } @@ -590,23 +670,59 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { * @return Response. */ private JdbcResponse closeQuery(JdbcQueryCloseRequest req) { + JdbcCursor cur = jdbcCursors.get(req.cursorId()); + + if (!prepareQueryCancellationMeta(cur)) + return new JdbcResponse(null); + try { - JdbcQueryCursor cur = qryCursors.remove(req.queryId()); + cur = jdbcCursors.remove(req.cursorId()); if (cur == null) return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, - "Failed to find query cursor with ID: " + req.queryId()); + "Failed to find query cursor with ID: " + req.cursorId()); cur.close(); return new JdbcResponse(null); } catch (Exception e) { - qryCursors.remove(req.queryId()); + jdbcCursors.remove(req.cursorId()); - U.error(log, "Failed to close SQL query [reqId=" + req.requestId() + ", req=" + req.queryId() + ']', e); + U.error(log, "Failed to close SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e); - return exceptionToResult(e); + if (X.cause(e, QueryCancelledException.class) != null) + return new JdbcResponse(null); + else + return exceptionToResult(e); + } + finally { + if (isCancellationSupported()) { + boolean clearCursors = false; + + synchronized (reqMux) { + assert cur != null; + + JdbcQueryDescriptor desc = reqRegister.get(cur.requestId()); + + if (desc != null) { + // Query was cancelled during execution. + if (desc.isCanceled()) { + clearCursors = true; + + unregisterRequest(req.requestId()); + } + else { + tryUnregisterRequest(cur.requestId()); + + desc.decrementUsageCount(); + } + } + } + + if (clearCursors) + clearCursors(cur.requestId()); + } } } @@ -617,12 +733,17 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { * @return Response. */ private JdbcResponse fetchQuery(JdbcQueryFetchRequest req) { - try { - JdbcQueryCursor cur = qryCursors.get(req.queryId()); + final JdbcQueryCursor cur = (JdbcQueryCursor)jdbcCursors.get(req.cursorId()); + + if (!prepareQueryCancellationMeta(cur)) + return JDBC_QUERY_CANCELLED_RESPONSE; + boolean unregisterReq = false; + + try { if (cur == null) return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, - "Failed to find query cursor with ID: " + req.queryId()); + "Failed to find query cursor with ID: " + req.cursorId()); if (req.pageSize() <= 0) return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, @@ -633,7 +754,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { JdbcQueryFetchResult res = new JdbcQueryFetchResult(cur.fetchRows(), !cur.hasNext()); if (res.last() && (!cur.isQuery() || autoCloseCursors)) { - qryCursors.remove(req.queryId()); + jdbcCursors.remove(req.cursorId()); + + unregisterReq = true; cur.close(); } @@ -643,7 +766,15 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { catch (Exception e) { U.error(log, "Failed to fetch SQL query result [reqId=" + req.requestId() + ", req=" + req + ']', e); - return exceptionToResult(e); + if (X.cause(e, QueryCancelledException.class) != null) + return exceptionToResult(new QueryCancelledException()); + else + return exceptionToResult(e); + } + finally { + assert cur != null; + + cleanupQueryCancellationMeta(unregisterReq, cur.requestId()); } } @@ -652,14 +783,17 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { * @return Response. */ private JdbcResponse getQueryMeta(JdbcQueryMetadataRequest req) { - try { - JdbcQueryCursor cur = qryCursors.get(req.queryId()); + final JdbcQueryCursor cur = (JdbcQueryCursor)jdbcCursors.get(req.cursorId()); + + if (!prepareQueryCancellationMeta(cur)) + return JDBC_QUERY_CANCELLED_RESPONSE; + try { if (cur == null) return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, - "Failed to find query with ID: " + req.queryId()); + "Failed to find query cursor with ID: " + req.cursorId()); - JdbcQueryMetadataResult res = new JdbcQueryMetadataResult(req.queryId(), + JdbcQueryMetadataResult res = new JdbcQueryMetadataResult(req.cursorId(), cur.meta()); return new JdbcResponse(res); @@ -669,6 +803,11 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { return exceptionToResult(e); } + finally { + assert cur != null; + + cleanupQueryCancellationMeta(false, cur.requestId()); + } } /** @@ -676,55 +815,80 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { * @return Response. */ private ClientListenerResponse executeBatch(JdbcBatchExecuteRequest req) { - String schemaName = req.schemaName(); + GridQueryCancel cancel = null; - if (F.isEmpty(schemaName)) - schemaName = QueryUtils.DFLT_SCHEMA; + if (isCancellationSupported()) { + synchronized (reqMux) { + JdbcQueryDescriptor desc = reqRegister.get(req.requestId()); - int qryCnt = req.queries().size(); + // Query was already cancelled and unregisterd. + if (desc == null) + return null; - List<Integer> updCntsAcc = new ArrayList<>(qryCnt); + cancel = desc.cancelHook(); - // Send back only the first error. Others will be written to the log. - IgniteBiTuple<Integer, String> firstErr = new IgniteBiTuple<>(); + desc.incrementUsageCount(); + } + } - SqlFieldsQueryEx qry = null; + try { + String schemaName = req.schemaName(); - for (JdbcQuery q : req.queries()) { - if (q.sql() != null) { // If we have a new query string in the batch, - if (qry != null) // then execute the previous sub-batch and create a new SqlFieldsQueryEx. - executeBatchedQuery(qry, updCntsAcc, firstErr); + if (F.isEmpty(schemaName)) + schemaName = QueryUtils.DFLT_SCHEMA; - qry = new SqlFieldsQueryEx(q.sql(), false); + int qryCnt = req.queries().size(); - qry.setDistributedJoins(cliCtx.isDistributedJoins()); - qry.setEnforceJoinOrder(cliCtx.isEnforceJoinOrder()); - qry.setCollocated(cliCtx.isCollocated()); - qry.setReplicatedOnly(cliCtx.isReplicatedOnly()); - qry.setLazy(cliCtx.isLazy()); - qry.setNestedTxMode(nestedTxMode); - qry.setAutoCommit(req.autoCommit()); + List<Integer> updCntsAcc = new ArrayList<>(qryCnt); - qry.setSchema(schemaName); - } + // Send back only the first error. Others will be written to the log. + IgniteBiTuple<Integer, String> firstErr = new IgniteBiTuple<>(); - assert qry != null; + SqlFieldsQueryEx qry = null; - qry.addBatchedArgs(q.args()); - } + for (JdbcQuery q : req.queries()) { + if (q.sql() != null) { // If we have a new query string in the batch, + if (qry != null) // then execute the previous sub-batch and create a new SqlFieldsQueryEx. + executeBatchedQuery(qry, updCntsAcc, firstErr, cancel); - if (qry != null) - executeBatchedQuery(qry, updCntsAcc, firstErr); + qry = new SqlFieldsQueryEx(q.sql(), false); - if (req.isLastStreamBatch()) - cliCtx.disableStreaming(); + qry.setDistributedJoins(cliCtx.isDistributedJoins()); + qry.setEnforceJoinOrder(cliCtx.isEnforceJoinOrder()); + qry.setCollocated(cliCtx.isCollocated()); + qry.setReplicatedOnly(cliCtx.isReplicatedOnly()); + qry.setLazy(cliCtx.isLazy()); + qry.setNestedTxMode(nestedTxMode); + qry.setAutoCommit(req.autoCommit()); - int updCnts[] = U.toIntArray(updCntsAcc); + qry.setSchema(schemaName); + } - if (firstErr.isEmpty()) - return new JdbcResponse(new JdbcBatchExecuteResult(updCnts, ClientListenerResponse.STATUS_SUCCESS, null)); - else - return new JdbcResponse(new JdbcBatchExecuteResult(updCnts, firstErr.getKey(), firstErr.getValue())); + assert qry != null; + + qry.addBatchedArgs(q.args()); + } + + if (qry != null) + executeBatchedQuery(qry, updCntsAcc, firstErr, cancel); + + if (req.isLastStreamBatch()) + cliCtx.disableStreaming(); + + int updCnts[] = U.toIntArray(updCntsAcc); + + if (firstErr.isEmpty()) + return new JdbcResponse(new JdbcBatchExecuteResult(updCnts, ClientListenerResponse.STATUS_SUCCESS, + null)); + else + return new JdbcResponse(new JdbcBatchExecuteResult(updCnts, firstErr.getKey(), firstErr.getValue())); + } + catch (QueryCancelledException e) { + return exceptionToResult(e); + } + finally { + cleanupQueryCancellationMeta(true, req.requestId()); + } } /** @@ -733,10 +897,12 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { * @param qry Query. * @param updCntsAcc Per query rows updates counter. * @param firstErr First error data - code and message. + * @param cancel Hook for query cancellation. + * @throws QueryCancelledException If query was cancelled during execution. */ - @SuppressWarnings("ForLoopReplaceableByForEach") + @SuppressWarnings({"ForLoopReplaceableByForEach", "unchecked"}) private void executeBatchedQuery(SqlFieldsQueryEx qry, List<Integer> updCntsAcc, - IgniteBiTuple<Integer, String> firstErr) { + IgniteBiTuple<Integer, String> firstErr, GridQueryCancel cancel) throws QueryCancelledException { try { if (cliCtx.isStream()) { List<Long> cnt = ctx.query().streamBatchedUpdateQuery(qry.getSchema(), cliCtx, qry.getSql(), @@ -748,7 +914,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { return; } - List<FieldsQueryCursor<List<?>>> qryRes = ctx.query().querySqlFields(null, qry, cliCtx, true, true); + List<FieldsQueryCursor<List<?>>> qryRes = ctx.query().querySqlFields(null, qry, cliCtx, true, true, cancel); for (FieldsQueryCursor<List<?>> cur : qryRes) { if (cur instanceof BulkLoadContextCursor) @@ -770,7 +936,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { String msg; - if (e instanceof IgniteSQLException) { + if (X.cause(e, QueryCancelledException.class) != null) + throw new QueryCancelledException(); + else if (e instanceof IgniteSQLException) { BatchUpdateException batchCause = X.cause(e, BatchUpdateException.class); if (batchCause != null) { @@ -804,7 +972,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { if (firstErr.isEmpty()) firstErr.set(code, msg); else - U.error(log, "Failed to execute batch query [qry=" + qry +']', e); + U.error(log, "Failed to execute batch query [qry=" + qry + ']', e); } } @@ -997,7 +1165,6 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { fields.add(field); } - final String keyName = table.keyFieldName() == null ? "PK_" + table.schemaName() + "_" + table.tableName() : table.keyFieldName(); @@ -1069,8 +1236,10 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { * @return resulting {@link JdbcResponse}. */ private JdbcResponse exceptionToResult(Exception e) { + if (e instanceof QueryCancelledException) + return new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED, e.getMessage()); if (e instanceof IgniteSQLException) - return new JdbcResponse(((IgniteSQLException) e).statusCode(), e.getMessage()); + return new JdbcResponse(((IgniteSQLException)e).statusCode(), e.getMessage()); else return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, e.getMessage()); } @@ -1112,4 +1281,163 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { } } } + + /** + * Cancels query with specified request id; + * + * @param req Query cancellation request; + * @return <code>QueryCancelledException</code> wrapped with <code>JdbcResponse</code> + */ + private JdbcResponse cancelQuery(JdbcQueryCancelRequest req) { + boolean clearCursors = false; + + GridQueryCancel cancelHook; + + synchronized (reqMux) { + JdbcQueryDescriptor desc = reqRegister.get(req.requestIdToBeCancelled()); + + // Query was already executed. + if (desc == null) + return null; + + // Query was registered, however execution didn't start yet. + else if (!desc.isExecutionStarted()) { + unregisterRequest(req.requestId()); + + return exceptionToResult(new QueryCancelledException()); + } + else { + cancelHook = desc.cancelHook(); + + desc.markCancelled(); + + if (desc.usageCount() == 0) { + clearCursors = true; + + unregisterRequest(req.requestIdToBeCancelled()); + } + } + } + + cancelHook.cancel(); + + if (clearCursors) + clearCursors(req.requestIdToBeCancelled()); + + return null; + } + + /** + * Checks whether query cancellation is supported whithin given version of protocal. + * + * @return True if supported, false otherwise. + */ + private boolean isCancellationSupported() { + return (protocolVer.compareTo(VER_2_8_0) >= 0); + } + + /** + * Unregisters request if there are no cursors binded to it. + * + * @param reqId Reuest to unregist. + */ + private void tryUnregisterRequest(long reqId) { + assert isCancellationSupported(); + + boolean unregisterReq = true; + + for (JdbcCursor cursor : jdbcCursors.values()) { + if (cursor.requestId() == reqId) { + unregisterReq = false; + + break; + } + } + + if (unregisterReq) + unregisterRequest(reqId); + } + + /** + * Tries to close all cursors of request with given id and removes them from jdbcCursors map. + * + * @param reqId Request ID. + */ + private void clearCursors(long reqId) { + for (Iterator<Map.Entry<Long, JdbcCursor>> it = jdbcCursors.entrySet().iterator(); it.hasNext(); ) { + Map.Entry<Long, JdbcCursor> entry = it.next(); + + JdbcCursor cursor = entry.getValue(); + + if (cursor.requestId() == reqId) { + try { + cursor.close(); + } + catch (Exception e) { + U.error(log, "Failed to close cursor [reqId=" + reqId + ", cursor=" + cursor + ']', e); + } + + it.remove(); + } + } + } + + /** + * Checks whether query was cancelled - returns null if true, otherwise increments query descriptor usage count. + * + * @param cur Jdbc Cursor. + * @return False, if query was already cancelled. + */ + private boolean prepareQueryCancellationMeta(JdbcCursor cur) { + if (isCancellationSupported()) { + // Nothing to do - cursor was already removed. + if (cur == null) + return false; + + synchronized (reqMux) { + JdbcQueryDescriptor desc = reqRegister.get(cur.requestId()); + + // Query was already cancelled and unregisterd. + if (desc == null) + return false; + + desc.incrementUsageCount(); + } + } + + return true; + } + + /** + * Cleanups cursors or processors and unregistered request if necessary. + * + * @param unregisterReq Flag, that detecs whether it's necessary to unregister request. + * @param reqId Request Id. + */ + private void cleanupQueryCancellationMeta(boolean unregisterReq, long reqId) { + if (isCancellationSupported()) { + boolean clearCursors = false; + + synchronized (reqMux) { + JdbcQueryDescriptor desc = reqRegister.get(reqId); + + if (desc != null) { + // Query was cancelled during execution. + if (desc.isCanceled()) { + clearCursors = true; + + unregisterReq = true; + } + else + desc.decrementUsageCount(); + + if (unregisterReq) + unregisterRequest(reqId); + } + } + + if (clearCursors) + clearCursors(reqId); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java index 5fab77a..ceb7964 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java @@ -33,8 +33,8 @@ public class JdbcResultInfo implements JdbcRawBinarylizable { /** Update count. */ private long updCnt; - /** Query ID. */ - private long qryId; + /** Cursor ID. */ + private long cursorId; /** * Default constructor is used for serialization. @@ -46,12 +46,12 @@ public class JdbcResultInfo implements JdbcRawBinarylizable { /** * @param isQuery Query flag. * @param updCnt Update count. - * @param qryId Query ID. + * @param cursorId Cursor ID. */ - public JdbcResultInfo(boolean isQuery, long updCnt, long qryId) { + public JdbcResultInfo(boolean isQuery, long updCnt, long cursorId) { this.isQuery = isQuery; this.updCnt = updCnt; - this.qryId = qryId; + this.cursorId = cursorId; } /** @@ -62,10 +62,10 @@ public class JdbcResultInfo implements JdbcRawBinarylizable { } /** - * @return Query ID. + * @return Cursor ID. */ - public long queryId() { - return qryId; + public long cursorId() { + return cursorId; } /** @@ -80,7 +80,7 @@ public class JdbcResultInfo implements JdbcRawBinarylizable { ClientListenerProtocolVersion ver) { writer.writeBoolean(isQuery); writer.writeLong(updCnt); - writer.writeLong(qryId); + writer.writeLong(cursorId); } /** {@inheritDoc} */ @@ -88,7 +88,7 @@ public class JdbcResultInfo implements JdbcRawBinarylizable { ClientListenerProtocolVersion ver) { isQuery = reader.readBoolean(); updCnt = reader.readLong(); - qryId = reader.readLong(); + cursorId = reader.readLong(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java index cf11270..5c63a4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.odbc.odbc; +import java.util.HashSet; +import java.util.Set; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; @@ -26,15 +28,12 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerAbstractConnecti import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser; import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion; import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler; -import org.apache.ignite.internal.processors.query.NestedTxMode; import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; import org.apache.ignite.internal.processors.odbc.ClientListenerResponseSender; +import org.apache.ignite.internal.processors.query.NestedTxMode; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.nio.GridNioSession; -import java.util.HashSet; -import java.util.Set; - /** * ODBC Connection Context. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java index 0e9b48a..0e66c93 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java @@ -409,6 +409,19 @@ public class OdbcMessageParser implements ClientListenerMessageParser { return writer.array(); } + /** {@inheritDoc} */ + @Override public int decodeCommandType(byte[] msg) { + assert msg != null; + + return msg[0]; + } + + + /** {@inheritDoc} */ + @Override public long decodeRequestId(byte[] msg) { + return 0; + } + /** * @param writer Writer to use. * @param affectedRows Affected rows. http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java index 10cfd6d..edb8a31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java @@ -300,6 +300,21 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { } } + /** {@inheritDoc} */ + @Override public boolean isCancellationCommand(int cmdId) { + return false; + } + + /** {@inheritDoc} */ + @Override public void registerRequest(long reqId, int cmdType) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void unregisterRequest(long reqId) { + // No-op. + } + /** * Make query considering handler configuration. * @param schema Schema. http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java index c65e64a..0e81c20 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java @@ -388,4 +388,18 @@ public class ClientMessageParser implements ClientListenerMessageParser { return outStream.arrayCopy(); } + + /** {@inheritDoc} */ + @Override public int decodeCommandType(byte[] msg) { + assert msg != null; + + BinaryInputStream inStream = new BinaryHeapInputStream(msg); + + return inStream.readShort(); + } + + /** {@inheritDoc} */ + @Override public long decodeRequestId(byte[] msg) { + return 0; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java index 8fe4e5d..1b59c48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java @@ -75,4 +75,20 @@ public class ClientRequestHandler implements ClientListenerRequestHandler { @Override public void writeHandshake(BinaryWriterExImpl writer) { writer.writeBoolean(true); } + + /** {@inheritDoc} */ + @Override public boolean isCancellationCommand(int cmdId) { + return false; + } + + + /** {@inheritDoc} */ + @Override public void registerRequest(long reqId, int cmdType) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void unregisterRequest(long reqId) { + // No-op. + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index c1edbbb..94ad17d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -2127,7 +2127,39 @@ public class GridQueryProcessor extends GridProcessorAdapter { cliCtx, keepBinary, failOnMultipleStmts, - GridCacheQueryType.SQL_FIELDS + GridCacheQueryType.SQL_FIELDS, + null + ); + } + + /** + * Query SQL fields. + * + * @param cctx Cache context. + * @param qry Query. + * @param cliCtx Client context. + * @param keepBinary Keep binary flag. + * @param failOnMultipleStmts If {@code true} the method must throws exception when query contains + * more then one SQL statement. + * @param cancel Hook for query cancellation. + * @return Cursor. + */ + public List<FieldsQueryCursor<List<?>>> querySqlFields( + @Nullable final GridCacheContext<?, ?> cctx, + final SqlFieldsQuery qry, + final SqlClientContext cliCtx, + final boolean keepBinary, + final boolean failOnMultipleStmts, + @Nullable final GridQueryCancel cancel + ) { + return querySqlFields( + cctx, + qry, + cliCtx, + keepBinary, + failOnMultipleStmts, + GridCacheQueryType.SQL_FIELDS, + cancel ); } @@ -2141,6 +2173,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param failOnMultipleStmts If {@code true} the method must throws exception when query contains * more then one SQL statement. * @param qryType Real query type. + * @param cancel Hook for query cancellation. * @return Cursor. */ public List<FieldsQueryCursor<List<?>>> querySqlFields( @@ -2149,7 +2182,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { final SqlClientContext cliCtx, final boolean keepBinary, final boolean failOnMultipleStmts, - GridCacheQueryType qryType + GridCacheQueryType qryType, + @Nullable final GridQueryCancel cancel ) { // Validate. checkxEnabled(); @@ -2167,10 +2201,19 @@ public class GridQueryProcessor extends GridProcessorAdapter { IgniteOutClosureX<List<FieldsQueryCursor<List<?>>>> clo = new IgniteOutClosureX<List<FieldsQueryCursor<List<?>>>>() { @Override public List<FieldsQueryCursor<List<?>>> applyx() { - GridQueryCancel cancel = new GridQueryCancel(); - - List<FieldsQueryCursor<List<?>>> res = idx.querySqlFields(schemaName, qry, cliCtx, - keepBinary, failOnMultipleStmts, null, cancel, true); + GridQueryCancel cancel0 = cancel != null ? cancel : new GridQueryCancel(); + + List<FieldsQueryCursor<List<?>>> res = + idx.querySqlFields( + schemaName, + qry, + cliCtx, + keepBinary, + failOnMultipleStmts, + null, + cancel0, + true + ); if (cctx != null) sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx, qryType); @@ -2298,7 +2341,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { null, keepBinary, true, - GridCacheQueryType.SQL + GridCacheQueryType.SQL, + null ).get(0); // Convert.
