http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCancelledException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCancelledException.java b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCancelledException.java index 5f5ffdc..eef5f52 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCancelledException.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCancelledException.java @@ -26,10 +26,13 @@ public class QueryCancelledException extends IgniteCheckedException { /** */ private static final long serialVersionUID = 0L; + /** Error message. */ + public static final String ERR_MSG = "The query was cancelled while executing."; + /** * Default constructor. */ public QueryCancelledException() { - super("The query was cancelled while executing."); + super(ERR_MSG); } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java index 110cc74..3ce4319 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.odbc.SqlStateCode; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteResult; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryCancelRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResponse; @@ -82,7 +83,7 @@ public class JdbcThinConnection implements Connection { private String schema; /** Closed flag. */ - private boolean closed; + private volatile boolean closed; /** Current transaction isolation. */ private int txIsolation; @@ -174,9 +175,10 @@ public class JdbcThinConnection implements Connection { /** * @param sql Statement. * @param cmd Parsed form of {@code sql}. + * @param stmt Jdbc thin statement. * @throws SQLException if failed. */ - void executeNative(String sql, SqlCommand cmd) throws SQLException { + void executeNative(String sql, SqlCommand cmd, JdbcThinStatement stmt) throws SQLException { if (cmd instanceof SqlSetStreamingCommand) { SqlSetStreamingCommand cmd0 = (SqlSetStreamingCommand)cmd; @@ -196,10 +198,12 @@ public class JdbcThinConnection implements Connection { + cliIo.igniteVersion() + ']', SqlStateCode.INTERNAL_ERROR); } + streamState = new StreamState((SqlSetStreamingCommand)cmd); + sendRequest(new JdbcQueryExecuteRequest(JdbcStatementType.ANY_STATEMENT_TYPE, - schema, 1, 1, autoCommit, sql, null)); + schema, 1, 1, autoCommit, sql, null), stmt); - streamState = new StreamState((SqlSetStreamingCommand)cmd); + streamState.start(); } } else @@ -739,10 +743,22 @@ public class JdbcThinConnection implements Connection { * @throws SQLException On any error. */ <R extends JdbcResult> R sendRequest(JdbcRequest req) throws SQLException { + return sendRequest(req, null); + } + + /** + * Send request for execution via {@link #cliIo}. + * @param req Request. + * @param stmt Jdbc thin statement. + * @return Server response. + * @throws SQLException On any error. + */ + @SuppressWarnings("unchecked") + <R extends JdbcResult> R sendRequest(JdbcRequest req, JdbcThinStatement stmt) throws SQLException { ensureConnected(); try { - JdbcResponse res = cliIo.sendRequest(req); + JdbcResponse res = cliIo.sendRequest(req, stmt); if (res.status() != ClientListenerResponse.STATUS_SUCCESS) throw new SQLException(res.error(), IgniteQueryErrorCode.codeToSqlState(res.status()), res.status()); @@ -765,6 +781,23 @@ public class JdbcThinConnection implements Connection { * @param req Request. * @throws SQLException On any error. */ + void sendQueryCancelRequest(JdbcQueryCancelRequest req) throws SQLException { + ensureConnected(); + + try { + cliIo.sendCancelRequest(req); + } + catch (Exception e) { + throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE, e); + } + } + + /** + * Send request for execution via {@link #cliIo}. Response is waited at the separate thread + * (see {@link StreamState#asyncRespReaderThread}). + * @param req Request. + * @throws SQLException On any error. + */ private void sendRequestNotWaitResponse(JdbcOrderedBatchExecuteRequest req) throws SQLException { ensureConnected(); @@ -877,7 +910,12 @@ public class JdbcThinConnection implements Connection { streamBatchSize = cmd.batchSize(); asyncRespReaderThread = new Thread(this::readResponses); + } + /** + * Start reader. + */ + void start() { asyncRespReaderThread.start(); } @@ -1031,4 +1069,11 @@ public class JdbcThinConnection implements Connection { } } } + + /** + * @return True if query cancellation supported, false otherwise. + */ + boolean isQueryCancellationSupported() { + return cliIo.isQueryCancellationSupported(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java index 4e2fef2..e4569dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java @@ -80,8 +80,8 @@ public class JdbcThinResultSet implements ResultSet { /** Statement. */ private final JdbcThinStatement stmt; - /** Query ID. */ - private final Long qryId; + /** Cursor ID. */ + private final Long cursorId; /** Metadata. */ private List<JdbcColumnMeta> meta; @@ -140,7 +140,7 @@ public class JdbcThinResultSet implements ResultSet { JdbcThinResultSet(List<List<Object>> fields, List<JdbcColumnMeta> meta) { stmt = null; fetchSize = 0; - qryId = -1L; + cursorId = -1L; finished = true; isQuery = true; updCnt = -1; @@ -160,7 +160,7 @@ public class JdbcThinResultSet implements ResultSet { * Creates new result set. * * @param stmt Statement. - * @param qryId Query ID. + * @param cursorId Cursor ID. * @param fetchSize Fetch size. * @param finished Finished flag. * @param rows Rows. @@ -169,13 +169,13 @@ public class JdbcThinResultSet implements ResultSet { * @param updCnt Update count. * @param closeStmt Close statement on the result set close. */ - JdbcThinResultSet(JdbcThinStatement stmt, long qryId, int fetchSize, boolean finished, + JdbcThinResultSet(JdbcThinStatement stmt, long cursorId, int fetchSize, boolean finished, List<List<Object>> rows, boolean isQuery, boolean autoClose, long updCnt, boolean closeStmt) { assert stmt != null; assert fetchSize > 0; this.stmt = stmt; - this.qryId = qryId; + this.cursorId = cursorId; this.fetchSize = fetchSize; this.finished = finished; this.isQuery = isQuery; @@ -194,10 +194,10 @@ public class JdbcThinResultSet implements ResultSet { /** {@inheritDoc} */ @Override public boolean next() throws SQLException { - ensureNotClosed(); + ensureAlive(); if ((rowsIter == null || !rowsIter.hasNext()) && !finished) { - JdbcQueryFetchResult res = stmt.conn.sendRequest(new JdbcQueryFetchRequest(qryId, fetchSize)); + JdbcQueryFetchResult res = stmt.conn.sendRequest(new JdbcQueryFetchRequest(cursorId, fetchSize), stmt); rows = res.items(); finished = res.last(); @@ -240,8 +240,8 @@ public class JdbcThinResultSet implements ResultSet { return; try { - if (!finished || (isQuery && !autoClose)) - stmt.conn.sendRequest(new JdbcQueryCloseRequest(qryId)); + if (!(stmt != null && stmt.isCancelled()) && (!finished || (isQuery && !autoClose))) + stmt.conn.sendRequest(new JdbcQueryCloseRequest(cursorId), stmt); } finally { closed = true; @@ -718,8 +718,11 @@ public class JdbcThinResultSet implements ResultSet { @Override public ResultSetMetaData getMetaData() throws SQLException { ensureNotClosed(); - if (jdbcMeta == null) + if (jdbcMeta == null) { + ensureNotCancelled(); + jdbcMeta = new JdbcThinResultSetMetadata(meta()); + } return jdbcMeta; } @@ -1835,7 +1838,7 @@ public class JdbcThinResultSet implements ResultSet { * @throws SQLException In case of error. */ private Object getValue(int colIdx) throws SQLException { - ensureNotClosed(); + ensureAlive(); ensureHasCurrentRow(); try { @@ -1861,6 +1864,27 @@ public class JdbcThinResultSet implements ResultSet { } /** + * Ensures that result set is not cancelled. + * + * @throws SQLException If result set is cancelled. + */ + private void ensureNotCancelled() throws SQLException { + if (stmt != null && stmt.isCancelled()) + throw new SQLException("The query was cancelled while executing.", SqlStateCode.QUERY_CANCELLED); + } + + /** + * Ensures that result set is not closed or cancelled. + * + * @throws SQLException If result set is closed or cancelled. + */ + private void ensureAlive() throws SQLException { + ensureNotClosed(); + + ensureNotCancelled(); + } + + /** * Ensures that result set is positioned on a row. * * @throws SQLException If result set is not positioned on a row. @@ -1879,11 +1903,11 @@ public class JdbcThinResultSet implements ResultSet { throw new SQLException("Server cursor is already closed.", SqlStateCode.INVALID_CURSOR_STATE); if (!metaInit) { - JdbcQueryMetadataResult res = stmt.conn.sendRequest(new JdbcQueryMetadataRequest(qryId)); + JdbcQueryMetadataResult res = stmt.conn.sendRequest(new JdbcQueryMetadataRequest(cursorId), stmt); - meta = res.meta(); + meta = res.meta(); - metaInit = true; + metaInit = true; } return meta; http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java index be25201..500e632 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadAckResult; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryCancelRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteMultipleStatementsResult; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteResult; @@ -66,13 +67,13 @@ public class JdbcThinStatement implements Statement { private static final int DFLT_PAGE_SIZE = SqlQuery.DFLT_PAGE_SIZE; /** JDBC Connection implementation. */ - protected JdbcThinConnection conn; + protected final JdbcThinConnection conn; /** Schema name. */ private final String schema; /** Closed flag. */ - private boolean closed; + private volatile boolean closed; /** Rows limit. */ private int maxRows; @@ -83,7 +84,7 @@ public class JdbcThinStatement implements Statement { /** Fetch size. */ private int pageSize = DFLT_PAGE_SIZE; - /** Result set holdability*/ + /** Result set holdability. */ private final int resHoldability; /** Batch size to keep track of number of items to return as fake update counters for executeBatch. */ @@ -96,11 +97,20 @@ public class JdbcThinStatement implements Statement { private boolean closeOnCompletion; /** Result sets. */ - protected List<JdbcThinResultSet> resultSets; + protected volatile List<JdbcThinResultSet> resultSets; /** Current result index. */ protected int curRes; + /** Current request Id. */ + private long currReqId; + + /** Cancelled flag. */ + private volatile boolean cancelled; + + /** Cancellation mutex. */ + final Object cancellationMux = new Object(); + /** * Creates new statement. * @@ -185,7 +195,7 @@ public class JdbcThinStatement implements Statement { nativeCmd = tryParseNative(sql); if (nativeCmd != null) { - conn.executeNative(sql, nativeCmd); + conn.executeNative(sql, nativeCmd, this); resultSets = Collections.singletonList(resultSetForUpdate(0)); @@ -207,8 +217,10 @@ public class JdbcThinStatement implements Statement { return; } - JdbcResult res0 = conn.sendRequest(new JdbcQueryExecuteRequest(stmtType, schema, pageSize, - maxRows, conn.getAutoCommit(), sql, args == null ? null : args.toArray(new Object[args.size()]))); + JdbcQueryExecuteRequest req = new JdbcQueryExecuteRequest(stmtType, schema, pageSize, + maxRows, conn.getAutoCommit(), sql, args == null ? null : args.toArray(new Object[args.size()])); + + JdbcResult res0 = conn.sendRequest(req, this); assert res0 != null; @@ -218,7 +230,7 @@ public class JdbcThinStatement implements Statement { if (res0 instanceof JdbcQueryExecuteResult) { JdbcQueryExecuteResult res = (JdbcQueryExecuteResult)res0; - resultSets = Collections.singletonList(new JdbcThinResultSet(this, res.getQueryId(), pageSize, + resultSets = Collections.singletonList(new JdbcThinResultSet(this, res.cursorId(), pageSize, res.last(), res.items(), res.isQuery(), conn.autoCloseServerCursor(), res.updateCount(), closeOnCompletion)); } @@ -231,21 +243,19 @@ public class JdbcThinStatement implements Statement { boolean firstRes = true; - for(JdbcResultInfo rsInfo : resInfos) { + for (JdbcResultInfo rsInfo : resInfos) { if (!rsInfo.isQuery()) resultSets.add(resultSetForUpdate(rsInfo.updateCount())); else { if (firstRes) { firstRes = false; - resultSets.add(new JdbcThinResultSet(this, rsInfo.queryId(), pageSize, - res.isLast(), res.items(), true, - conn.autoCloseServerCursor(), -1, closeOnCompletion)); + resultSets.add(new JdbcThinResultSet(this, rsInfo.cursorId(), pageSize, res.isLast(), + res.items(), true, conn.autoCloseServerCursor(), -1, closeOnCompletion)); } else { - resultSets.add(new JdbcThinResultSet(this, rsInfo.queryId(), pageSize, - false, null, true, - conn.autoCloseServerCursor(), -1, closeOnCompletion)); + resultSets.add(new JdbcThinResultSet(this, rsInfo.cursorId(), pageSize, false, + null, true, conn.autoCloseServerCursor(), -1, closeOnCompletion)); } } } @@ -300,27 +310,30 @@ public class JdbcThinStatement implements Statement { continue; JdbcResult res = conn.sendRequest(new JdbcBulkLoadBatchRequest( - cmdRes.queryId(), + cmdRes.cursorId(), batchNum++, JdbcBulkLoadBatchRequest.CMD_CONTINUE, - readBytes == buf.length ? buf : Arrays.copyOf(buf, readBytes))); + readBytes == buf.length ? buf : Arrays.copyOf(buf, readBytes)), + this); if (!(res instanceof JdbcQueryExecuteResult)) throw new SQLException("Unknown response sent by the server: " + res); } return conn.sendRequest(new JdbcBulkLoadBatchRequest( - cmdRes.queryId(), + cmdRes.cursorId(), batchNum++, - JdbcBulkLoadBatchRequest.CMD_FINISHED_EOF)); + JdbcBulkLoadBatchRequest.CMD_FINISHED_EOF), + this); } } catch (Exception e) { try { conn.sendRequest(new JdbcBulkLoadBatchRequest( - cmdRes.queryId(), + cmdRes.cursorId(), batchNum, - JdbcBulkLoadBatchRequest.CMD_FINISHED_ERROR)); + JdbcBulkLoadBatchRequest.CMD_FINISHED_ERROR), + this); } catch (SQLException e1) { throw new SQLException("Cannot send finalization request: " + e1.getMessage(), e); @@ -370,6 +383,19 @@ public class JdbcThinStatement implements Statement { resultSets = null; curRes = 0; } + + synchronized (cancellationMux) { + currReqId = 0; + + cancelled = false; + } + } + + /** + * @return Returns true if statement was cancelled, false otherwise. + */ + boolean isCancelled() { + return cancelled; } /** @@ -445,6 +471,27 @@ public class JdbcThinStatement implements Statement { /** {@inheritDoc} */ @Override public void cancel() throws SQLException { ensureNotClosed(); + + if (!isQueryCancellationSupported()) + throw new SQLFeatureNotSupportedException("Cancel method is not supported."); + + long reqId; + + synchronized (cancellationMux) { + if (isCancelled()) + return; + + if (conn.isStream()) + throw new SQLFeatureNotSupportedException("Cancel method is not allowed in streaming mode."); + + reqId = currReqId; + + if (reqId != 0) + cancelled = true; + } + + if (reqId != 0) + conn.sendQueryCancelRequest(new JdbcQueryCancelRequest(reqId)); } /** {@inheritDoc} */ @@ -514,7 +561,7 @@ public class JdbcThinStatement implements Statement { * @throws SQLException If failed. */ private JdbcThinResultSet nextResultSet() throws SQLException { - ensureNotClosed(); + ensureAlive(); if (resultSets == null || curRes >= resultSets.size()) return null; @@ -524,7 +571,7 @@ public class JdbcThinStatement implements Statement { /** {@inheritDoc} */ @Override public boolean getMoreResults() throws SQLException { - ensureNotClosed(); + ensureAlive(); return getMoreResults(CLOSE_CURRENT_RESULT); } @@ -645,9 +692,11 @@ public class JdbcThinStatement implements Statement { if (F.isEmpty(batch)) return new int[0]; + JdbcBatchExecuteRequest req = new JdbcBatchExecuteRequest(conn.getSchema(), batch, + conn.getAutoCommit(), false); + try { - JdbcBatchExecuteResult res = conn.sendRequest(new JdbcBatchExecuteRequest(conn.getSchema(), batch, - conn.getAutoCommit(), false)); + JdbcBatchExecuteResult res = conn.sendRequest(req, this); if (res.errorCode() != ClientListenerResponse.STATUS_SUCCESS) { throw new BatchUpdateException(res.errorMessage(), IgniteQueryErrorCode.codeToSqlState(res.errorCode()), @@ -672,7 +721,7 @@ public class JdbcThinStatement implements Statement { /** {@inheritDoc} */ @Override public boolean getMoreResults(int curr) throws SQLException { - ensureNotClosed(); + ensureAlive(); if (resultSets != null) { assert curRes <= resultSets.size() : "Invalid results state: [resultsCount=" + resultSets.size() + @@ -704,7 +753,7 @@ public class JdbcThinStatement implements Statement { /** {@inheritDoc} */ @Override public ResultSet getGeneratedKeys() throws SQLException { - ensureNotClosed(); + ensureAlive(); throw new SQLFeatureNotSupportedException("Auto-generated columns are not supported."); } @@ -728,6 +777,7 @@ public class JdbcThinStatement implements Statement { /** {@inheritDoc} */ @Override public int executeUpdate(String sql, int[] colIndexes) throws SQLException { ensureNotClosed(); + throw new SQLFeatureNotSupportedException("Auto-generated columns are not supported."); } @@ -850,16 +900,28 @@ public class JdbcThinStatement implements Statement { } /** - * Ensures that statement is not closed. + * Ensures that statement not closed. * * @throws SQLException If statement is closed. */ - protected void ensureNotClosed() throws SQLException { + void ensureNotClosed() throws SQLException { if (isClosed()) throw new SQLException("Statement is closed."); } /** + * Ensures that statement neither closed nor canceled. + * + * @throws SQLException If statement is closed or canceled. + */ + void ensureAlive() throws SQLException { + ensureNotClosed(); + + if (cancelled) + throw new SQLException("The query was cancelled while executing.", SqlStateCode.QUERY_CANCELLED); + } + + /** * Used by statement on closeOnCompletion mode. * @throws SQLException On error. */ @@ -879,4 +941,27 @@ public class JdbcThinStatement implements Statement { if (allRsClosed) close(); } + + /** + * @param currReqId Sets curresnt request Id. + */ + void currentRequestId(long currReqId) { + synchronized (cancellationMux) { + this.currReqId = currReqId; + } + } + + /** + * @return Cancellation mutex. + */ + Object cancellationMutex() { + return cancellationMux; + } + + /** + * @return True if query cancellation supported, false otherwise. + */ + private boolean isQueryCancellationSupported() { + return conn.isQueryCancellationSupported(); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java index 102075e..11dc221 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java @@ -28,12 +28,13 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; - import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.internal.binary.BinaryReaderExImpl; import org.apache.ignite.internal.binary.BinaryWriterExImpl; import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; +import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.odbc.ClientListenerNioListener; import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion; import org.apache.ignite.internal.processors.odbc.ClientListenerRequest; @@ -41,7 +42,9 @@ import org.apache.ignite.internal.processors.odbc.SqlStateCode; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryCancelRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryCloseRequest; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryFetchRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryMetadataRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest; @@ -74,8 +77,11 @@ public class JdbcThinTcpIo { /** Version 2.7.0. */ private static final ClientListenerProtocolVersion VER_2_7_0 = ClientListenerProtocolVersion.create(2, 7, 0); + /** Version 2.8.0. */ + private static final ClientListenerProtocolVersion VER_2_8_0 = ClientListenerProtocolVersion.create(2, 8, 0); + /** Current version. */ - public static final ClientListenerProtocolVersion CURRENT_VER = VER_2_7_0; + public static final ClientListenerProtocolVersion CURRENT_VER = VER_2_8_0; /** Initial output stream capacity for handshake. */ private static final int HANDSHAKE_MSG_SIZE = 13; @@ -122,6 +128,9 @@ public class JdbcThinTcpIo { /** Mutex. */ private final Object mux = new Object(); + /** Connection mutex. */ + private final Object connMux = new Object(); + /** Current protocol version used to connection to Ignite. */ private ClientListenerProtocolVersion srvProtocolVer; @@ -377,7 +386,8 @@ public class JdbcThinTcpIo { + ", url=" + connProps.getUrl() + ']', SqlStateCode.CONNECTION_REJECTED); } - if (VER_2_5_0.equals(srvProtoVer0) + if (VER_2_7_0.equals(srvProtoVer0) + || VER_2_5_0.equals(srvProtoVer0) || VER_2_4_0.equals(srvProtoVer0) || VER_2_3_0.equals(srvProtoVer0) || VER_2_1_5.equals(srvProtoVer0)) @@ -464,14 +474,7 @@ public class JdbcThinTcpIo { + CURRENT_VER + ", remoteNodeVer=" + igniteVer + ']', SqlStateCode.INTERNAL_ERROR); } - int cap = guessCapacity(req); - - BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(cap), - null, null); - - req.writeBinary(writer, srvProtocolVer); - - send(writer.array()); + sendRequestRaw(req); } finally { synchronized (mux) { @@ -482,11 +485,12 @@ public class JdbcThinTcpIo { /** * @param req Request. + * @param stmt Statement. * @return Server response. * @throws IOException In case of IO error. * @throws SQLException On concurrent access to JDBC connection. */ - JdbcResponse sendRequest(JdbcRequest req) throws SQLException, IOException { + JdbcResponse sendRequest(JdbcRequest req, JdbcThinStatement stmt) throws SQLException, IOException { synchronized (mux) { if (ownThread != null) { throw new SQLException("Concurrent access to JDBC connection is not allowed" @@ -498,15 +502,30 @@ public class JdbcThinTcpIo { } try { - int cap = guessCapacity(req); + if (stmt != null) { + synchronized (stmt.cancellationMutex()) { + if (stmt.isCancelled()) { + if (req instanceof JdbcQueryCloseRequest) + return new JdbcResponse(null); - BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(cap), null, null); + return new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED, QueryCancelledException.ERR_MSG); + } - req.writeBinary(writer, srvProtocolVer); + sendRequestRaw(req); - send(writer.array()); + if (req instanceof JdbcQueryExecuteRequest || req instanceof JdbcBatchExecuteRequest) + stmt.currentRequestId(req.requestId()); + } + } + else + sendRequestRaw(req); + + JdbcResponse resp = readResponse(); - return readResponse(); + if (stmt != null && stmt.isCancelled()) + return new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED, QueryCancelledException.ERR_MSG); + else + return resp; } finally { synchronized (mux) { @@ -516,11 +535,22 @@ public class JdbcThinTcpIo { } /** + * Sends cancel request. + * + * @param cancellationReq contains request id to be cancelled + * @throws IOException In case of IO error. + */ + void sendCancelRequest(JdbcQueryCancelRequest cancellationReq) throws IOException { + sendRequestRaw(cancellationReq); + } + + /** * @return Server response. * @throws IOException In case of IO error. */ JdbcResponse readResponse() throws IOException { - BinaryReaderExImpl reader = new BinaryReaderExImpl(null, new BinaryHeapInputStream(read()), null, null, false); + BinaryReaderExImpl reader = new BinaryReaderExImpl(null, new BinaryHeapInputStream(read()), null, + null, false); JdbcResponse res = new JdbcResponse(); @@ -529,7 +559,6 @@ public class JdbcThinTcpIo { return res; } - /** * Try to guess request capacity. * @@ -560,6 +589,23 @@ public class JdbcThinTcpIo { } /** + * @param req Request. + * @throws IOException In case of IO error. + */ + private void sendRequestRaw(JdbcRequest req) throws IOException { + int cap = guessCapacity(req); + + BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(cap), + null, null); + + req.writeBinary(writer, srvProtocolVer); + + synchronized (connMux) { + send(writer.array()); + } + } + + /** * @param req JDBC request bytes. * @throws IOException On error. */ @@ -652,6 +698,15 @@ public class JdbcThinTcpIo { } /** + * @return True if query cancellation supported, false otherwise. + */ + boolean isQueryCancellationSupported() { + assert srvProtocolVer != null; + + return srvProtocolVer.compareTo(VER_2_8_0) >= 0; + } + + /** * @return Current server index. */ public int serverIndex() { http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java index 39f1b60..6fdac22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java @@ -80,6 +80,9 @@ public final class IgniteQueryErrorCode { /** Conversion failure. */ public static final int CONVERSION_FAILED = 3013; + /** Query canceled. */ + public static final int QUERY_CANCELED = 3014; + /* 4xxx - cache related runtime errors */ /** Attempt to INSERT a key that is already in cache. */ @@ -193,6 +196,9 @@ public final class IgniteQueryErrorCode { case TRANSACTION_SERIALIZATION_ERROR: return SqlStateCode.SERIALIZATION_FAILURE; + case QUERY_CANCELED: + return SqlStateCode.QUERY_CANCELLED; + default: return SqlStateCode.INTERNAL_ERROR; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java index ab80f47..c3782ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java @@ -27,7 +27,7 @@ public interface ClientListenerMessageParser { * @param msg Message. * @return Request. */ - public ClientListenerRequest decode(byte[] msg); + ClientListenerRequest decode(byte[] msg); /** * Encode response to byte array. @@ -35,5 +35,21 @@ public interface ClientListenerMessageParser { * @param resp Response. * @return Message. */ - public byte[] encode(ClientListenerResponse resp); + byte[] encode(ClientListenerResponse resp); + + /** + * Decode command type. Allows to recognize the command (message type) without decoding the entire message. + * + * @param msg Message. + * @return Command type. + */ + int decodeCommandType(byte[] msg); + + /** + * Decode request Id. Allows to recognize the request Id, if any, without decoding the entire message. + * + * @param msg Message. + * @return Request Id. + */ + long decodeRequestId(byte[] msg); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java index debef42..defde3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java @@ -62,7 +62,7 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte public static final int MAX_HANDSHAKE_MSG_SIZE = 128; /** Connection-related metadata key. */ - static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + public static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); /** Next connection id. */ private static AtomicInteger nextConnId = new AtomicInteger(1); @@ -143,6 +143,13 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte req = parser.decode(msg); } catch (Exception e) { + try { + handler.unregisterRequest(parser.decodeRequestId(msg)); + } + catch (Exception e1) { + U.error(log, "Failed to unregister request.", e1); + } + U.error(log, "Failed to parse client request.", e); ses.close(); @@ -194,6 +201,8 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte } } catch (Exception e) { + handler.unregisterRequest(req.requestId()); + U.error(log, "Failed to process client request [req=" + req + ']', e); ses.send(parser.encode(handler.handleException(e, req))); http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java index c8c0260..5870f40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; import javax.cache.configuration.Factory; import javax.management.JMException; import javax.management.ObjectName; @@ -67,6 +68,9 @@ public class ClientListenerProcessor extends GridProcessorAdapter { /** Client listener port. */ public static final String CLIENT_LISTENER_PORT = "clientListenerPort"; + /** Cancel counter. For testing purposes only. */ + public static final AtomicLong CANCEL_COUNTER = new AtomicLong(0); + /** Default number of selectors. */ private static final int DFLT_SELECTOR_CNT = Math.min(4, Runtime.getRuntime().availableProcessors()); @@ -253,6 +257,46 @@ public class ClientListenerProcessor extends GridProcessorAdapter { throws IgniteCheckedException { proceedSessionOpened(ses); } + + @Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException { + ClientListenerConnectionContext connCtx = ses.meta(ClientListenerNioListener.CONN_CTX_META_KEY); + + if (connCtx != null && connCtx.parser() != null) { + byte[] inMsg; + + int cmdType; + + long reqId; + + try { + inMsg = (byte[])msg; + + cmdType = connCtx.parser().decodeCommandType(inMsg); + + reqId = connCtx.parser().decodeRequestId(inMsg); + } + catch (Exception e) { + U.error(log, "Failed to parse client request.", e); + + ses.close(); + + return; + } + + if (connCtx.handler().isCancellationCommand(cmdType)) { + proceedMessageReceived(ses, msg); + + CANCEL_COUNTER.incrementAndGet(); + } + else { + connCtx.handler().registerRequest(reqId, cmdType); + + super.onMessageReceived(ses, msg); + } + } + else + super.onMessageReceived(ses, msg); + } }; GridNioFilter codecFilter = new GridNioCodecFilter(new ClientListenerBufferedParser(), log, false); http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequestHandler.java index cebde08..6e901e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequestHandler.java @@ -29,7 +29,7 @@ public interface ClientListenerRequestHandler { * @param req Request. * @return Response. */ - public ClientListenerResponse handle(ClientListenerRequest req); + ClientListenerResponse handle(ClientListenerRequest req); /** * Handle exception. @@ -38,12 +38,33 @@ public interface ClientListenerRequestHandler { * @param req Request. * @return Error response. */ - public ClientListenerResponse handleException(Exception e, ClientListenerRequest req); + ClientListenerResponse handleException(Exception e, ClientListenerRequest req); /** * Write successful handshake response. * * @param writer Binary writer. */ - public void writeHandshake(BinaryWriterExImpl writer); + void writeHandshake(BinaryWriterExImpl writer); + + /** + * Detect whether given command is a cancellation command. + * + * @param cmdId Command Id + * @return true if given command is cancellation one, false otherwise; + */ + boolean isCancellationCommand(int cmdId); + + /** + * Registers request for futher cancellation if any. + * @param reqId Request Id. + * @param cmdType Command Type. + */ + void registerRequest(long reqId, int cmdType); + + /** + * Try to unregister request. + * @param reqId Request Id. + */ + void unregisterRequest(long reqId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlStateCode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlStateCode.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlStateCode.java index 4778425..bc7fbf3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlStateCode.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlStateCode.java @@ -72,4 +72,7 @@ public final class SqlStateCode { /** Internal error. */ public static final String INTERNAL_ERROR = "50000"; // Generic value for custom "50" class. + + /** Query canceled. */ + public static final String QUERY_CANCELLED = "57014"; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java index 8a90f5b..42b1936 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java @@ -72,7 +72,8 @@ public class JdbcBatchExecuteRequest extends JdbcRequest { * @param autoCommit Client auto commit flag state. * @param lastStreamBatch {@code true} in case the request is the last batch at the stream. */ - public JdbcBatchExecuteRequest(String schemaName, List<JdbcQuery> queries, boolean autoCommit, boolean lastStreamBatch) { + public JdbcBatchExecuteRequest(String schemaName, List<JdbcQuery> queries, boolean autoCommit, + boolean lastStreamBatch) { super(BATCH_EXEC); assert lastStreamBatch || !F.isEmpty(queries); @@ -92,7 +93,8 @@ public class JdbcBatchExecuteRequest extends JdbcRequest { * @param autoCommit Client auto commit flag state. * @param lastStreamBatch {@code true} in case the request is the last batch at the stream. */ - protected JdbcBatchExecuteRequest(byte type, String schemaName, List<JdbcQuery> queries, boolean autoCommit, boolean lastStreamBatch) { + protected JdbcBatchExecuteRequest(byte type, String schemaName, List<JdbcQuery> queries, boolean autoCommit, + boolean lastStreamBatch) { super(type); assert lastStreamBatch || !F.isEmpty(queries); http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadAckResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadAckResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadAckResult.java index b0750fd..0d97009 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadAckResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadAckResult.java @@ -33,8 +33,8 @@ import org.apache.ignite.internal.util.typedef.internal.S; * @see SqlBulkLoadCommand */ public class JdbcBulkLoadAckResult extends JdbcResult { - /** Query ID for matching this command on server in further {@link JdbcBulkLoadBatchRequest} commands. */ - private long qryId; + /** Cursor ID for matching this command on server in further {@link JdbcBulkLoadBatchRequest} commands. */ + private long cursorId; /** * Bulk load parameters, which are parsed on the server side and sent to client to specify @@ -46,30 +46,30 @@ public class JdbcBulkLoadAckResult extends JdbcResult { public JdbcBulkLoadAckResult() { super(BULK_LOAD_ACK); - qryId = 0; + cursorId = 0; params = null; } /** * Constructs a request from server (in form of reply) to send files from client to server. * - * @param qryId Query ID to send in further {@link JdbcBulkLoadBatchRequest}s. + * @param cursorId Cursor ID to send in further {@link JdbcBulkLoadBatchRequest}s. * @param params Various parameters for sending batches from client side. */ - public JdbcBulkLoadAckResult(long qryId, BulkLoadAckClientParameters params) { + public JdbcBulkLoadAckResult(long cursorId, BulkLoadAckClientParameters params) { super(BULK_LOAD_ACK); - this.qryId = qryId; + this.cursorId = cursorId; this.params = params; } /** - * Returns the query ID. + * Returns the cursor ID. * - * @return Query ID. + * @return Cursor ID. */ - public long queryId() { - return qryId; + public long cursorId() { + return cursorId; } /** @@ -86,7 +86,7 @@ public class JdbcBulkLoadAckResult extends JdbcResult { ClientListenerProtocolVersion ver) throws BinaryObjectException { super.writeBinary(writer, ver); - writer.writeLong(qryId); + writer.writeLong(cursorId); writer.writeString(params.localFileName()); writer.writeInt(params.packetSize()); } @@ -96,7 +96,7 @@ public class JdbcBulkLoadAckResult extends JdbcResult { ClientListenerProtocolVersion ver) throws BinaryObjectException { super.readBinary(reader, ver); - qryId = reader.readLong(); + cursorId = reader.readLong(); String locFileName = reader.readString(); int batchSize = reader.readInt(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadBatchRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadBatchRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadBatchRequest.java index 388cd49..ad7690f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadBatchRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadBatchRequest.java @@ -48,8 +48,8 @@ public class JdbcBulkLoadBatchRequest extends JdbcRequest { */ public static final int CMD_FINISHED_EOF = 2; - /** QueryID of the original COPY command request. */ - private long qryId; + /** CursorId of the original COPY command request. */ + private long cursorId; /** Batch index starting from 0. */ private int batchIdx; @@ -66,7 +66,7 @@ public class JdbcBulkLoadBatchRequest extends JdbcRequest { public JdbcBulkLoadBatchRequest() { super(BULK_LOAD_BATCH); - qryId = -1; + cursorId = -1; batchIdx = -1; cmd = CMD_UNKNOWN; data = null; @@ -76,27 +76,27 @@ public class JdbcBulkLoadBatchRequest extends JdbcRequest { * Creates the request with specified parameters and zero-length data. * Typically used with {@link #CMD_FINISHED_ERROR} and {@link #CMD_FINISHED_EOF}. * - * @param qryId The query ID from the {@link JdbcBulkLoadAckResult}. + * @param cursorId The cursor ID from the {@link JdbcBulkLoadAckResult}. * @param batchIdx Index of the current batch starting with 0. * @param cmd The command ({@link #CMD_CONTINUE}, {@link #CMD_FINISHED_EOF}, or {@link #CMD_FINISHED_ERROR}). */ @SuppressWarnings("ZeroLengthArrayAllocation") - public JdbcBulkLoadBatchRequest(long qryId, int batchIdx, int cmd) { - this(qryId, batchIdx, cmd, new byte[0]); + public JdbcBulkLoadBatchRequest(long cursorId, int batchIdx, int cmd) { + this(cursorId, batchIdx, cmd, new byte[0]); } /** * Creates the request with the specified parameters. * - * @param qryId The query ID from the {@link JdbcBulkLoadAckResult}. + * @param cursorId The cursor ID from the {@link JdbcBulkLoadAckResult}. * @param batchIdx Index of the current batch starting with 0. * @param cmd The command ({@link #CMD_CONTINUE}, {@link #CMD_FINISHED_EOF}, or {@link #CMD_FINISHED_ERROR}). * @param data The data block (zero length is acceptable). */ - public JdbcBulkLoadBatchRequest(long qryId, int batchIdx, int cmd, @NotNull byte[] data) { + public JdbcBulkLoadBatchRequest(long cursorId, int batchIdx, int cmd, @NotNull byte[] data) { super(BULK_LOAD_BATCH); - this.qryId = qryId; + this.cursorId = cursorId; this.batchIdx = batchIdx; assert isCmdValid(cmd) : "Invalid command value: " + cmd; @@ -106,12 +106,12 @@ public class JdbcBulkLoadBatchRequest extends JdbcRequest { } /** - * Returns the original query ID. + * Returns the original cursor ID. * - * @return The original query ID. + * @return The original cursor ID. */ - public long queryId() { - return qryId; + public long cursorId() { + return cursorId; } /** @@ -146,7 +146,7 @@ public class JdbcBulkLoadBatchRequest extends JdbcRequest { ClientListenerProtocolVersion ver) throws BinaryObjectException { super.writeBinary(writer, ver); - writer.writeLong(qryId); + writer.writeLong(cursorId); writer.writeInt(batchIdx); writer.writeInt(cmd); writer.writeByteArray(data); @@ -157,7 +157,7 @@ public class JdbcBulkLoadBatchRequest extends JdbcRequest { ClientListenerProtocolVersion ver) throws BinaryObjectException { super.readBinary(reader, ver); - qryId = reader.readLong(); + cursorId = reader.readLong(); batchIdx = reader.readInt(); int c = reader.readInt(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadProcessor.java index 9757791..2018c0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadProcessor.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.odbc.jdbc; +import java.io.IOException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteIllegalStateException; import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor; @@ -71,7 +72,7 @@ import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchR * {@link JdbcBulkLoadBatchRequest#CMD_FINISHED_ERROR} and the processing * is aborted on the both sides. */ -public class JdbcBulkLoadProcessor { +public class JdbcBulkLoadProcessor extends JdbcCursor { /** A core processor that handles incoming data packets. */ private final BulkLoadProcessor processor; @@ -82,8 +83,11 @@ public class JdbcBulkLoadProcessor { * Creates a JDBC-specific adapter for bulk load processor. * * @param processor Bulk load processor from the core to delegate calls to. + * @param reqId Id of the request that created given processor. */ - public JdbcBulkLoadProcessor(BulkLoadProcessor processor) { + public JdbcBulkLoadProcessor(BulkLoadProcessor processor, long reqId) { + super(reqId); + this.processor = processor; nextBatchIdx = 0; } @@ -98,7 +102,7 @@ public class JdbcBulkLoadProcessor { */ public void processBatch(JdbcBulkLoadBatchRequest req) throws IgniteCheckedException { - if (nextBatchIdx != req.batchIdx()) + if (nextBatchIdx != req.batchIdx() && req.cmd() != CMD_FINISHED_ERROR) throw new IgniteSQLException("Batch #" + (nextBatchIdx + 1) + " is missing. Received #" + req.batchIdx() + " instead."); @@ -127,10 +131,15 @@ public class JdbcBulkLoadProcessor { * Closes the underlying objects. * Currently we don't handle normal termination vs. abort. */ - public void close() throws Exception { - processor.close(); + @Override public void close() throws IOException { + try { + processor.close(); - nextBatchIdx = -1; + nextBatchIdx = -1; + } + catch (Exception e) { + throw new IOException("Unable to close processor: " + e.getMessage(), e); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java index 8ba0790..bd6b328 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java @@ -57,8 +57,11 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte /** Version 2.7.0: adds maximum length for columns feature.*/ static final ClientListenerProtocolVersion VER_2_7_0 = ClientListenerProtocolVersion.create(2, 7, 0); + /** Version 2.8.0: adds query id in order to implement cancel feature.*/ + static final ClientListenerProtocolVersion VER_2_8_0 = ClientListenerProtocolVersion.create(2, 8, 0); + /** Current version. */ - private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_7_0; + private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_8_0; /** Supported versions. */ private static final Set<ClientListenerProtocolVersion> SUPPORTED_VERS = new HashSet<>(); @@ -83,6 +86,7 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte static { SUPPORTED_VERS.add(CURRENT_VER); + SUPPORTED_VERS.add(VER_2_8_0); SUPPORTED_VERS.add(VER_2_7_0); SUPPORTED_VERS.add(VER_2_5_0); SUPPORTED_VERS.add(VER_2_4_0); http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcCursor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcCursor.java new file mode 100644 index 0000000..8c15714 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcCursor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import java.io.Closeable; +import java.util.concurrent.atomic.AtomicLong; + +/** + * JDBC Cursor. + */ +public abstract class JdbcCursor implements Closeable { + /** Cursor Id generator. */ + private static final AtomicLong CURSOR_ID_GENERATOR = new AtomicLong(); + + /** Cursor Id. */ + private final long cursorId; + + /** Id of the request that created given cursor. */ + private final long reqId; + + /** + * Constructor. + * + * @param reqId Id of the request that created given cursor. + */ + protected JdbcCursor(long reqId) { + cursorId = CURSOR_ID_GENERATOR.getAndIncrement(); + + this.reqId = reqId; + } + + /** + * @return Cursor Id. + */ + public long cursorId() { + return cursorId; + } + + /** + * @return Id of the request that created given cursor. + */ + public long requestId() { + return reqId; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java index 1718c00..66abf6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java @@ -91,4 +91,19 @@ public class JdbcMessageParser implements ClientListenerMessageParser { res.writeBinary(writer, ver); return writer.array(); - }} + } + + /** {@inheritDoc} */ + @Override public int decodeCommandType(byte[] msg) { + assert msg != null; + + return JdbcRequest.readType(msg); + } + + /** {@inheritDoc} */ + @Override public long decodeRequestId(byte[] msg) { + assert msg != null; + + return JdbcRequest.readRequestId(msg); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCancelRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCancelRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCancelRequest.java new file mode 100644 index 0000000..7d6e854 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCancelRequest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * JDBC query cancel request. + */ +public class JdbcQueryCancelRequest extends JdbcRequest { + + /** Id of a request to be cancelled. */ + private long reqIdToCancel; + + /** + */ + public JdbcQueryCancelRequest() { + super(QRY_CANCEL); + } + + /** + * @param reqIdToCancel Id of a request to be cancelled. + */ + public JdbcQueryCancelRequest(long reqIdToCancel) { + super(QRY_CANCEL); + + this.reqIdToCancel = reqIdToCancel; + } + + /** + * @return Id of a request to be cancelled. + */ + public long requestIdToBeCancelled() { + return reqIdToCancel; + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriterExImpl writer, + ClientListenerProtocolVersion ver) throws BinaryObjectException { + super.writeBinary(writer, ver); + + writer.writeLong(reqIdToCancel); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReaderExImpl reader, + ClientListenerProtocolVersion ver) throws BinaryObjectException { + super.readBinary(reader, ver); + + reqIdToCancel = reader.readLong(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(JdbcQueryCancelRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCloseRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCloseRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCloseRequest.java index 5c631c3..02eb774 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCloseRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCloseRequest.java @@ -27,8 +27,8 @@ import org.apache.ignite.internal.util.typedef.internal.S; * JDBC query close request. */ public class JdbcQueryCloseRequest extends JdbcRequest { - /** Query ID. */ - private long queryId; + /** Cursor ID. */ + private long cursorId; /** */ @@ -37,19 +37,19 @@ public class JdbcQueryCloseRequest extends JdbcRequest { } /** - * @param queryId Query ID. + * @param cursorId Cursor ID. */ - public JdbcQueryCloseRequest(long queryId) { + public JdbcQueryCloseRequest(long cursorId) { super(QRY_CLOSE); - this.queryId = queryId; + this.cursorId = cursorId; } /** - * @return Query ID. + * @return Cursor ID. */ - public long queryId() { - return queryId; + public long cursorId() { + return cursorId; } /** {@inheritDoc} */ @@ -57,7 +57,7 @@ public class JdbcQueryCloseRequest extends JdbcRequest { ClientListenerProtocolVersion ver) throws BinaryObjectException { super.writeBinary(writer, ver); - writer.writeLong(queryId); + writer.writeLong(cursorId); } /** {@inheritDoc} */ @@ -65,7 +65,7 @@ public class JdbcQueryCloseRequest extends JdbcRequest { ClientListenerProtocolVersion ver) throws BinaryObjectException { super.readBinary(reader, ver); - queryId = reader.readLong(); + cursorId = reader.readLong(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java index 830daea..cd566ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java @@ -27,10 +27,7 @@ import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; /** * SQL listener query fetch result. */ -class JdbcQueryCursor { - /** Query ID. */ - private final long queryId; - +class JdbcQueryCursor extends JdbcCursor { /** Fetch size. */ private int pageSize; @@ -44,20 +41,26 @@ class JdbcQueryCursor { private final QueryCursorImpl<List<Object>> cur; /** Query results iterator. */ - private final Iterator<List<Object>> iter; + private Iterator<List<Object>> iter; /** - * @param queryId Query ID. * @param pageSize Fetch size. * @param maxRows Max rows. * @param cur Query cursor. + * @param reqId Id of the request that created given cursor. */ - JdbcQueryCursor(long queryId, int pageSize, int maxRows, QueryCursorImpl<List<Object>> cur) { - this.queryId = queryId; + JdbcQueryCursor(int pageSize, int maxRows, QueryCursorImpl<List<Object>> cur, long reqId) { + super(reqId); + this.pageSize = pageSize; this.maxRows = maxRows; this.cur = cur; + } + /** + * Open iterator; + */ + void openIterator(){ iter = cur.iterator(); } @@ -105,16 +108,9 @@ class JdbcQueryCursor { } /** - * @return Query ID. - */ - public long queryId() { - return queryId; - } - - /** * Close the cursor. */ - public void close() { + @Override public void close() { cur.close(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryDescriptor.java new file mode 100644 index 0000000..c9621f4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryDescriptor.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import org.apache.ignite.internal.processors.query.GridQueryCancel; + +/** + * JDBC query descriptor used for appropriate query cancellation. + */ +public class JdbcQueryDescriptor { + /** Hook for cancellation. */ + private final GridQueryCancel cancelHook; + + /** Canceled flag. */ + private boolean canceled; + + /** Usage count of given descriptor. */ + private int usageCnt; + + /** Execution started flag. */ + private boolean executionStarted; + + /** + * Constructor. + */ + public JdbcQueryDescriptor() { + cancelHook = new GridQueryCancel(); + } + + /** + * @return Hook for cancellation. + */ + public GridQueryCancel cancelHook() { + return cancelHook; + } + + /** + * @return True if descriptor was marked as canceled. + */ + public boolean isCanceled() { + return canceled; + } + + /** + * Marks descriptor as canceled. + */ + public void markCancelled() { + canceled = true; + } + + /** + * Increments usage count. + */ + public void incrementUsageCount() { + usageCnt++; + + executionStarted = true; + } + + /** + * Descrements usage count. + */ + public void decrementUsageCount() { + usageCnt--; + } + + /** + * @return Usage count. + */ + public int usageCount() { + return usageCnt; + } + + /** + * @return True if execution was started, false otherwise. + */ + public boolean isExecutionStarted() { + return executionStarted; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryExecuteResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryExecuteResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryExecuteResult.java index 342e8ef..0b91555 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryExecuteResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryExecuteResult.java @@ -28,8 +28,8 @@ import org.apache.ignite.internal.util.typedef.internal.S; * JDBC query execute result. */ public class JdbcQueryExecuteResult extends JdbcResult { - /** Query ID. */ - private long queryId; + /** Cursor ID. */ + private long cursorId; /** Query result rows. */ private List<List<Object>> items; @@ -44,44 +44,44 @@ public class JdbcQueryExecuteResult extends JdbcResult { private long updateCnt; /** - * Condtructor. + * Constructor. */ JdbcQueryExecuteResult() { super(QRY_EXEC); } /** - * @param queryId Query ID. + * @param cursorId Cursor ID. * @param items Query result rows. * @param last Flag indicates the query has no unfetched results. */ - JdbcQueryExecuteResult(long queryId, List<List<Object>> items, boolean last) { + JdbcQueryExecuteResult(long cursorId, List<List<Object>> items, boolean last) { super(QRY_EXEC); - this.queryId = queryId; + this.cursorId = cursorId; this.items = items; this.last = last; this.isQuery = true; } /** - * @param queryId Query ID. + * @param cursorId Cursor ID. * @param updateCnt Update count for DML queries. */ - public JdbcQueryExecuteResult(long queryId, long updateCnt) { + public JdbcQueryExecuteResult(long cursorId, long updateCnt) { super(QRY_EXEC); - this.queryId = queryId; + this.cursorId = cursorId; this.last = true; this.isQuery = false; this.updateCnt = updateCnt; } /** - * @return Query ID. + * @return Cursor ID. */ - public long getQueryId() { - return queryId; + public long cursorId() { + return cursorId; } /** @@ -117,7 +117,7 @@ public class JdbcQueryExecuteResult extends JdbcResult { ClientListenerProtocolVersion ver) throws BinaryObjectException { super.writeBinary(writer, ver); - writer.writeLong(queryId); + writer.writeLong(cursorId); writer.writeBoolean(isQuery); if (isQuery) { @@ -137,7 +137,7 @@ public class JdbcQueryExecuteResult extends JdbcResult { ClientListenerProtocolVersion ver) throws BinaryObjectException { super.readBinary(reader, ver); - queryId = reader.readLong(); + cursorId = reader.readLong(); isQuery = reader.readBoolean(); if (isQuery) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryFetchRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryFetchRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryFetchRequest.java index 59ed9a8..79d61d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryFetchRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryFetchRequest.java @@ -27,8 +27,8 @@ import org.apache.ignite.internal.util.typedef.internal.S; * JDBC query fetch request. */ public class JdbcQueryFetchRequest extends JdbcRequest { - /** Query ID. */ - private long queryId; + /** Cursor ID. */ + private long cursorId; /** Fetch size. */ private int pageSize; @@ -41,21 +41,21 @@ public class JdbcQueryFetchRequest extends JdbcRequest { } /** - * @param queryId Query ID. + * @param cursorId Cursor ID. * @param pageSize Fetch size. */ - public JdbcQueryFetchRequest(long queryId, int pageSize) { + public JdbcQueryFetchRequest(long cursorId, int pageSize) { super(QRY_FETCH); - this.queryId = queryId; + this.cursorId = cursorId; this.pageSize = pageSize; } /** - * @return Query ID. + * @return Cursor ID. */ - public long queryId() { - return queryId; + public long cursorId() { + return cursorId; } /** @@ -70,7 +70,7 @@ public class JdbcQueryFetchRequest extends JdbcRequest { ClientListenerProtocolVersion ver) throws BinaryObjectException { super.writeBinary(writer, ver); - writer.writeLong(queryId); + writer.writeLong(cursorId); writer.writeInt(pageSize); } @@ -79,9 +79,9 @@ public class JdbcQueryFetchRequest extends JdbcRequest { ClientListenerProtocolVersion ver) throws BinaryObjectException { super.readBinary(reader, ver); - queryId = reader.readLong(); + cursorId = reader.readLong(); pageSize = reader.readInt(); - } + } /** {@inheritDoc} */ @Override public String toString() { http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryMetadataRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryMetadataRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryMetadataRequest.java index f30ecfd..08ed43e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryMetadataRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryMetadataRequest.java @@ -27,8 +27,8 @@ import org.apache.ignite.internal.util.typedef.internal.S; * JDBC query metadata request. */ public class JdbcQueryMetadataRequest extends JdbcRequest { - /** Query ID. */ - private long qryId; + /** Cursor ID. */ + private long cursorId; /** * Constructor. @@ -38,19 +38,19 @@ public class JdbcQueryMetadataRequest extends JdbcRequest { } /** - * @param qryId Query ID. + * @param cursorId Cursor ID. */ - public JdbcQueryMetadataRequest(long qryId) { + public JdbcQueryMetadataRequest(long cursorId) { super(QRY_META); - this.qryId = qryId; + this.cursorId = cursorId; } /** - * @return Query ID. + * @return Cursor ID. */ - public long queryId() { - return qryId; + public long cursorId() { + return cursorId; } /** {@inheritDoc} */ @@ -58,7 +58,7 @@ public class JdbcQueryMetadataRequest extends JdbcRequest { ClientListenerProtocolVersion ver) throws BinaryObjectException { super.writeBinary(writer, ver); - writer.writeLong(qryId); + writer.writeLong(cursorId); } /** {@inheritDoc} */ @@ -66,7 +66,7 @@ public class JdbcQueryMetadataRequest extends JdbcRequest { ClientListenerProtocolVersion ver) throws BinaryObjectException { super.readBinary(reader, ver); - qryId = reader.readLong(); + cursorId = reader.readLong(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java index 0674edf..f611c8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java @@ -17,13 +17,18 @@ package org.apache.ignite.internal.processors.odbc.jdbc; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteException; import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.internal.binary.BinaryReaderExImpl; import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; +import org.apache.ignite.internal.binary.streams.BinaryInputStream; import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion; import org.apache.ignite.internal.processors.odbc.ClientListenerRequestNoId; +import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_8_0; + /** * JDBC request. */ @@ -67,26 +72,47 @@ public class JdbcRequest extends ClientListenerRequestNoId implements JdbcRawBin /** Ordered batch request. */ static final byte BATCH_EXEC_ORDERED = 14; + /** Execute cancel request. */ + static final byte QRY_CANCEL = 15; + + /** Request Id generator. */ + private static final AtomicLong REQ_ID_GENERATOR = new AtomicLong(); + /** Request type. */ private byte type; + /** Request id. */ + private long reqId; + /** * @param type Command type. */ public JdbcRequest(byte type) { this.type = type; + + reqId = REQ_ID_GENERATOR.incrementAndGet(); } /** {@inheritDoc} */ @Override public void writeBinary(BinaryWriterExImpl writer, ClientListenerProtocolVersion ver) throws BinaryObjectException { writer.writeByte(type); + + if (ver.compareTo(VER_2_8_0) >= 0) + writer.writeLong(reqId); } /** {@inheritDoc} */ @Override public void readBinary(BinaryReaderExImpl reader, ClientListenerProtocolVersion ver) throws BinaryObjectException { - // No-op. + + if (ver.compareTo(VER_2_8_0) >= 0) + reqId = reader.readLong(); + } + + /** {@inheritDoc} */ + @Override public long requestId() { + return reqId; } /** @@ -174,6 +200,11 @@ public class JdbcRequest extends ClientListenerRequestNoId implements JdbcRawBin break; + case QRY_CANCEL: + req = new JdbcQueryCancelRequest(); + + break; + default: throw new IgniteException("Unknown SQL listener request ID: [request ID=" + reqType + ']'); } @@ -182,4 +213,28 @@ public class JdbcRequest extends ClientListenerRequestNoId implements JdbcRawBin return req; } + + /** + * Reads JdbcRequest command type. + * + * @param msg Jdbc request as byte array. + * @return Command type. + */ + public static byte readType(byte[] msg) { + return msg[0]; + } + + /** + * Reads JdbcRequest Id. + * + * @param msg Jdbc request as byte array. + * @return Request Id. + */ + public static long readRequestId(byte[] msg) { + BinaryInputStream stream = new BinaryHeapInputStream(msg); + + stream.position(1); + + return stream.readLong(); + } }