IGNITE-5439: JDBC Thin: query cancel support. This closes #4252. This closes 
#5441.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7ceecc43
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7ceecc43
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7ceecc43

Branch: refs/heads/master
Commit: 7ceecc43f9abc507e4e21830882cc7ca4e7b13c7
Parents: a31c0a2
Author: alapin <lapin1...@gmail.com>
Authored: Fri Dec 28 15:26:37 2018 +0300
Committer: devozerov <voze...@gridgain.com>
Committed: Fri Dec 28 15:26:37 2018 +0300

----------------------------------------------------------------------
 .../jdbc/suite/IgniteJdbcDriverTestSuite.java   |     2 +
 .../thin/JdbcThinStatementCancelSelfTest.java   |   769 +
 .../jdbc/thin/JdbcThinStatementSelfTest.java    |    48 -
 ...ThinTransactionsAbstractComplexSelfTest.java |     2 +-
 .../src/test/resources/bulkload20_000.csv       | 20000 +++++++++++++++++
 .../cache/query/QueryCancelledException.java    |     5 +-
 .../internal/jdbc/thin/JdbcThinConnection.java  |    55 +-
 .../internal/jdbc/thin/JdbcThinResultSet.java   |    54 +-
 .../internal/jdbc/thin/JdbcThinStatement.java   |   143 +-
 .../internal/jdbc/thin/JdbcThinTcpIo.java       |    93 +-
 .../cache/query/IgniteQueryErrorCode.java       |     6 +
 .../odbc/ClientListenerMessageParser.java       |    20 +-
 .../odbc/ClientListenerNioListener.java         |    11 +-
 .../odbc/ClientListenerProcessor.java           |    44 +
 .../odbc/ClientListenerRequestHandler.java      |    27 +-
 .../internal/processors/odbc/SqlStateCode.java  |     3 +
 .../odbc/jdbc/JdbcBatchExecuteRequest.java      |     6 +-
 .../odbc/jdbc/JdbcBulkLoadAckResult.java        |    24 +-
 .../odbc/jdbc/JdbcBulkLoadBatchRequest.java     |    30 +-
 .../odbc/jdbc/JdbcBulkLoadProcessor.java        |    21 +-
 .../odbc/jdbc/JdbcConnectionContext.java        |     6 +-
 .../processors/odbc/jdbc/JdbcCursor.java        |    60 +
 .../processors/odbc/jdbc/JdbcMessageParser.java |    17 +-
 .../odbc/jdbc/JdbcQueryCancelRequest.java       |    76 +
 .../odbc/jdbc/JdbcQueryCloseRequest.java        |    20 +-
 .../processors/odbc/jdbc/JdbcQueryCursor.java   |    28 +-
 .../odbc/jdbc/JdbcQueryDescriptor.java          |    95 +
 .../odbc/jdbc/JdbcQueryExecuteResult.java       |    28 +-
 .../odbc/jdbc/JdbcQueryFetchRequest.java        |    22 +-
 .../odbc/jdbc/JdbcQueryMetadataRequest.java     |    20 +-
 .../processors/odbc/jdbc/JdbcRequest.java       |    57 +-
 .../odbc/jdbc/JdbcRequestHandler.java           |   544 +-
 .../processors/odbc/jdbc/JdbcResultInfo.java    |    20 +-
 .../odbc/odbc/OdbcConnectionContext.java        |     7 +-
 .../processors/odbc/odbc/OdbcMessageParser.java |    13 +
 .../odbc/odbc/OdbcRequestHandler.java           |    15 +
 .../platform/client/ClientMessageParser.java    |    14 +
 .../platform/client/ClientRequestHandler.java   |    16 +
 .../processors/query/GridQueryProcessor.java    |    58 +-
 39 files changed, 22127 insertions(+), 352 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index c4f8065..89ff29b 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -73,6 +73,7 @@ import 
org.apache.ignite.jdbc.thin.JdbcThinPreparedStatementSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinResultSetSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinSchemaCaseTest;
 import org.apache.ignite.jdbc.thin.JdbcThinSelectAfterAlterTable;
+import org.apache.ignite.jdbc.thin.JdbcThinStatementCancelSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinStatementSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinStreamingNotOrderedSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinStreamingOrderedSelfTest;
@@ -163,6 +164,7 @@ import org.junit.runners.Suite;
     JdbcThinMetadataSelfTest.class,
     JdbcThinMetadataPrimaryKeysSelfTest.class,
     JdbcThinErrorsSelfTest.class,
+    JdbcThinStatementCancelSelfTest.class,
 
     JdbcThinInsertStatementSelfTest.class,
     JdbcThinUpdateStatementSelfTest.class,

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementCancelSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementCancelSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementCancelSelfTest.java
new file mode 100644
index 0000000..b811498
--- /dev/null
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementCancelSelfTest.java
@@ -0,0 +1,769 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath;
+
+/**
+ * Statement cancel test.
+ */
+@SuppressWarnings({"ThrowableNotThrown", "AssertWithSideEffects"})
+@RunWith(JUnit4.class)
+public class JdbcThinStatementCancelSelfTest extends JdbcThinAbstractSelfTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** URL. */
+    private static final String URL = "jdbc:ignite:thin://127.0.0.1/";
+
+    /** A CSV file with one record. */
+    private static final String BULKLOAD_20_000_LINE_CSV_FILE =
+        
Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload20_000.csv")).
+            getAbsolutePath();
+
+    /** Max table rows. */
+    private static final int MAX_ROWS = 10000;
+
+    /** Server thread pull size. */
+    private static final int SERVER_THREAD_POOL_SIZE = 4;
+
+    /** Cancellation processing timeout. */
+    public static final int TIMEOUT = 5000;
+
+    /** Nodes count. */
+    private static final byte NODES_COUNT = 3;
+
+    /** Timeout for checking async result. */
+    public static final int CHECK_RESULT_TIMEOUT = 1_000;
+
+    /** Connection. */
+    private Connection conn;
+
+    /** Statement. */
+    private Statement stmt;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        CacheConfiguration<?,?> cache = defaultCacheConfiguration();
+
+        cache.setCacheMode(PARTITIONED);
+        cache.setBackups(1);
+        cache.setWriteSynchronizationMode(FULL_SYNC);
+        cache.setSqlFunctionClasses(TestSQLFunctions.class);
+        cache.setIndexedTypes(Integer.class, Integer.class, Long.class, 
Long.class, String.class,
+            JdbcThinAbstractDmlStatementSelfTest.Person.class);
+
+        cfg.setCacheConfiguration(cache);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        cfg.setClientConnectorConfiguration(new ClientConnectorConfiguration().
+            setThreadPoolSize(SERVER_THREAD_POOL_SIZE));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES_COUNT);
+
+        for (int i = 0; i < MAX_ROWS; ++i)
+            grid(0).cache(DEFAULT_CACHE_NAME).put(i, i);
+
+        for (int i = 0; i < MAX_ROWS; ++i)
+            grid(0).cache(DEFAULT_CACHE_NAME).put((long)i, (long)i);
+    }
+
+    /**
+     * Called before execution of every test method in class.
+     *
+     * @throws Exception If failed.
+     */
+    @Before
+    public void before() throws Exception {
+        TestSQLFunctions.init();
+
+        conn = DriverManager.getConnection(URL);
+
+        conn.setSchema('"' + DEFAULT_CACHE_NAME + '"');
+
+        stmt = conn.createStatement();
+
+        assert stmt != null;
+        assert !stmt.isClosed();
+    }
+
+    /**
+     * Called after execution of every test method in class.
+     *
+     * @throws Exception If failed.
+     */
+    @After
+    public void after() throws Exception {
+        if (stmt != null && !stmt.isClosed()) {
+            stmt.close();
+
+            assert stmt.isClosed();
+        }
+
+        conn.close();
+
+        assert stmt.isClosed();
+        assert conn.isClosed();
+    }
+
+    /**
+     * Trying to cancel stament without query. In given case cancel is noop, 
so no exception expected.
+     */
+    @Test
+    public void testCancelingStmtWithoutQuery() {
+        try {
+            stmt.cancel();
+        }
+        catch (Exception e) {
+            log.error("Unexpected exception.", e);
+
+            fail("Unexpected exception");
+        }
+    }
+
+    /**
+     * Trying to retrieve result set of a canceled query.
+     * SQLException with message "The query was cancelled while executing." 
expected.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testResultSetRetrievalInCanceledStatement() throws Exception {
+        stmt.execute("SELECT 1; SELECT 2; SELECT 3;");
+
+        assertNotNull(stmt.getResultSet());
+
+        stmt.cancel();
+
+        GridTestUtils.assertThrows(log, () -> {
+            stmt.getResultSet();
+
+            return null;
+        }, SQLException.class, "The query was cancelled while executing.");
+    }
+
+    /**
+     * Trying to cancel already cancelled query.
+     * No exceptions exceped.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCancelCanceledQuery() throws Exception {
+        stmt.execute("SELECT 1;");
+
+        assertNotNull(stmt.getResultSet());
+
+        stmt.cancel();
+
+        stmt.cancel();
+
+        GridTestUtils.assertThrows(log, () -> {
+            stmt.getResultSet();
+
+            return null;
+        }, SQLException.class, "The query was cancelled while executing.");
+    }
+
+    /**
+     * Trying to cancel closed query.
+     * SQLException with message "Statement is closed." expected.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCancelClosedStmt() throws Exception {
+        stmt.close();
+
+        GridTestUtils.assertThrows(log, () -> {
+            stmt.cancel();
+
+            return null;
+        }, SQLException.class, "Statement is closed.");
+    }
+
+    /**
+     * Trying to call <code>resultSet.next()</code> on a canceled query.
+     * SQLException with message "The query was cancelled while executing." 
expected.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testResultSetNextAfterCanceling() throws Exception {
+        stmt.setFetchSize(10);
+
+        ResultSet rs = stmt.executeQuery("select * from Integer");
+
+        assert rs.next();
+
+        stmt.cancel();
+
+        GridTestUtils.assertThrows(log, () -> {
+            rs.next();
+
+            return null;
+        }, SQLException.class, "The query was cancelled while executing.");
+    }
+
+    /**
+     * Ensure that it's possible to execute new query on cancelled statement.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCancelAnotherStmt() throws Exception {
+        stmt.setFetchSize(10);
+
+        ResultSet rs = stmt.executeQuery("select * from Integer");
+
+        assert rs.next();
+
+        stmt.cancel();
+
+        ResultSet rs2 = stmt.executeQuery("select * from Integer order by 
_val");
+
+        assert rs2.next() : "The other cursor mustn't be closed";
+    }
+
+    /**
+     * Ensure that stament cancel doesn't effect another statement workflow, 
created by the same connection.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCancelAnotherStmtResultSet() throws Exception {
+        try (Statement anotherStmt = conn.createStatement()) {
+            ResultSet rs1 = stmt.executeQuery("select * from Integer WHERE 
_key % 2 = 0");
+
+            ResultSet rs2 = anotherStmt.executeQuery("select * from Integer  
WHERE _key % 2 <> 0");
+
+            stmt.cancel();
+
+            GridTestUtils.assertThrows(log, () -> {
+                rs1.next();
+
+                return null;
+            }, SQLException.class, "The query was cancelled while executing.");
+
+            assert rs2.next() : "The other cursor mustn't be closed";
+        }
+    }
+
+    /**
+     * Trying to cancel long running query. No exceptions expected.
+     * In order to guarantee correct concurrent processing of query itself and 
it's cancellation request
+     * two latches and some other stuff is used.
+     * For more details see <code>TestSQLFunctions#awaitLatchCancelled()</code>
+     * and 
<code>JdbcThinStatementCancelSelfTest#cancel(java.sql.Statement)</code>.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCancelQuery() throws Exception {
+        IgniteInternalFuture cancelRes = cancel(stmt);
+
+        GridTestUtils.assertThrows(log, () -> {
+            stmt.executeQuery("select * from Integer where _key in " +
+                "(select _key from Integer where awaitLatchCancelled() = 0) 
and shouldNotBeCalledInCaseOfCancellation()");
+
+            return null;
+        }, SQLException.class, "The query was cancelled while executing.");
+
+        // Ensures that there were no exceptions within async cancellation 
process.
+        cancelRes.get(CHECK_RESULT_TIMEOUT);
+    }
+
+    /**
+     * Trying close canceling query. No exceptions expected.
+     * In order to guarantee correct concurrent processing of query itself and 
it's cancellation request
+     * two latches and some other stuff is used.
+     * For more details see <code>TestSQLFunctions#awaitLatchCancelled()</code>
+     * and 
<code>JdbcThinStatementCancelSelfTest#cancel(java.sql.Statement)</code>.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCloseCancelingQuery() throws Exception {
+        IgniteInternalFuture res = GridTestUtils.runAsync(() -> {
+            try {
+                TestSQLFunctions.cancelLatch.await();
+
+                long cancelCntrBeforeCancel = 
ClientListenerProcessor.CANCEL_COUNTER.get();
+
+                stmt.cancel();
+
+                try {
+                    GridTestUtils.waitForCondition(
+                        () -> ClientListenerProcessor.CANCEL_COUNTER.get() == 
cancelCntrBeforeCancel + 1, TIMEOUT);
+                }
+                catch (IgniteInterruptedCheckedException ignored) {
+                    // No-op.
+                }
+
+                assertEquals(cancelCntrBeforeCancel + 1, 
ClientListenerProcessor.CANCEL_COUNTER.get());
+
+                // Nothing expected here, cause query was already marked as 
canceled.
+                stmt.close();
+
+                TestSQLFunctions.reqLatch.countDown();
+            }
+            catch (Exception e) {
+                log.error("Unexpected exception.", e);
+
+                fail("Unexpected exception");
+            }
+        });
+
+        GridTestUtils.assertThrows(log, () -> {
+            stmt.executeQuery("select * from Integer where _key in " +
+                "(select _key from Integer where awaitLatchCancelled() = 0) 
and shouldNotBeCalledInCaseOfCancellation()");
+
+            return null;
+        }, SQLException.class, "The query was cancelled while executing.");
+
+        // Ensures that there were no exceptions within async cancellation 
process.
+        res.get(CHECK_RESULT_TIMEOUT);
+    }
+
+    /**
+     * Trying to cancel long running multiple statments query. No exceptions 
expected.
+     * In order to guarantee correct concurrent processing of query itself and 
it's cancellation request
+     * two latches and some other stuff is used.
+     * For more details see <code>TestSQLFunctions#awaitLatchCancelled()</code>
+     * and 
<code>JdbcThinStatementCancelSelfTest#cancel(java.sql.Statement)</code>.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCancelMultipleStatementsQuery() throws Exception {
+        try (Statement anotherStatment = conn.createStatement()) {
+            anotherStatment.setFetchSize(1);
+
+            ResultSet rs = anotherStatment.executeQuery("select * from 
Integer");
+
+            assert rs.next();
+
+            IgniteInternalFuture cancelRes = cancel(stmt);
+
+            GridTestUtils.assertThrows(log, () -> {
+                // Executes multiple long running query
+                stmt.execute(
+                    "select 100 from Integer;"
+                        + "select _key from Integer where 
awaitLatchCancelled() = 0;"
+                        + "select 100 from Integer I1 join Integer I2;"
+                        + "select * from Integer where 
shouldNotBeCalledInCaseOfCancellation()");
+                return null;
+            }, SQLException.class, "The query was cancelled while executing");
+
+            assert rs.next() : "The other cursor mustn't be closed";
+
+            // Ensures that there were no exceptions within async cancellation 
process.
+            cancelRes.get(CHECK_RESULT_TIMEOUT);
+        }
+    }
+
+    /**
+     * Trying to cancel long running batch query. No exceptions expected.
+     * In order to guarantee correct concurrent processing of query itself and 
it's cancellation request
+     * two latches and some other stuff is used.
+     * For more details see <code>TestSQLFunctions#awaitLatchCancelled()</code>
+     * and 
<code>JdbcThinStatementCancelSelfTest#cancel(java.sql.Statement)</code>.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCancelBatchQuery() throws Exception {
+        try (Statement stmt2 = conn.createStatement()) {
+            stmt2.setFetchSize(1);
+
+            ResultSet rs = stmt2.executeQuery("SELECT * from Integer");
+
+            assert rs.next();
+
+            IgniteInternalFuture cancelRes = cancel(stmt);
+
+            GridTestUtils.assertThrows(log, () -> {
+                stmt.addBatch("update Long set _val = _val + 1 where _key < 
sleep_func (30)");
+                stmt.addBatch("update Long set _val = _val + 1 where 
awaitLatchCancelled() = 0");
+                stmt.addBatch("update Long set _val = _val + 1 where _key < 
sleep_func (30)");
+                stmt.addBatch("update Long set _val = _val + 1 where 
shouldNotBeCalledInCaseOfCancellation()");
+
+                stmt.executeBatch();
+                return null;
+            }, java.sql.SQLException.class, "The query was cancelled while 
executing");
+
+            assert rs.next() : "The other cursor mustn't be closed";
+
+            // Ensures that there were no exceptions within async cancellation 
process.
+            cancelRes.get(CHECK_RESULT_TIMEOUT);
+        }
+    }
+
+    /**
+     * Trying to cancel long running query in situation that there's no worker 
for cancel query,
+     * cause server thread pool is full. No exceptions expected.
+     * In order to guarantee correct concurrent processing of query itself and 
it's cancellation request
+     * thress latches and some other stuff is used.
+     * For more details see 
<code>TestSQLFunctions#awaitLatchCancelled()</code>,
+     * <code>TestSQLFunctions#awaitQuerySuspensionLatch()</code>
+     * and 
<code>JdbcThinStatementCancelSelfTest#cancel(java.sql.Statement)</code>.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCancelAgainstFullServerThreadPool() throws Exception {
+        List<Statement> statements = Collections.synchronizedList(new 
ArrayList<>());
+        List<Connection> connections = Collections.synchronizedList(new 
ArrayList<>());
+
+        // Prepares connections and statemens in order to use them for filling 
thread pool with pseuso-infine quries.
+        for (int i = 0; i < SERVER_THREAD_POOL_SIZE; i++) {
+            Connection yaConn = DriverManager.getConnection(URL);
+
+            yaConn.setSchema('"' + DEFAULT_CACHE_NAME + '"');
+
+            connections.add(yaConn);
+
+            Statement yaStmt = yaConn.createStatement();
+
+            statements.add(yaStmt);
+        }
+
+        try {
+            IgniteInternalFuture cancelRes = 
cancel(statements.get(SERVER_THREAD_POOL_SIZE - 1));
+
+            // Completely fills server thread pool.
+            IgniteInternalFuture<Long> fillPoolRes = 
fillServerThreadPool(statements, SERVER_THREAD_POOL_SIZE - 1);
+
+            GridTestUtils.assertThrows(log, () -> {
+                statements.get(SERVER_THREAD_POOL_SIZE - 1).executeQuery(
+                    "select * from Integer where _key in " +
+                        "(select _key from Integer where awaitLatchCancelled() 
= 0) and" +
+                        " shouldNotBeCalledInCaseOfCancellation()");
+
+                return null;
+            }, SQLException.class, "The query was cancelled while executing.");
+
+            // Releases queries in thread pool.
+            TestSQLFunctions.suspendQryLatch.countDown();
+
+            // Ensures that there were no exceptions within async cancellation 
process.
+            cancelRes.get(CHECK_RESULT_TIMEOUT);
+
+            // Ensures that there were no exceptions within async thread pool 
filling process.
+            fillPoolRes.get(CHECK_RESULT_TIMEOUT);
+        }
+        finally {
+            for (Statement statement : statements)
+                statement.close();
+
+            for (Connection connection : connections)
+                connection.close();
+        }
+    }
+
+    /**
+     * Trying to cancel fetch query in situation that there's no worker for 
cancel query,
+     * cause server thread pool is full. No exceptions expected.
+     * In order to guarantee correct concurrent processing of query itself and 
it's cancellation request
+     * thress latches and some other stuff is used.
+     * For more details see 
<code>TestSQLFunctions#awaitLatchCancelled()</code>,
+     * <code>TestSQLFunctions#awaitQuerySuspensionLatch()</code>
+     * and 
<code>JdbcThinStatementCancelSelfTest#cancel(java.sql.Statement)</code>.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCancelFetchAgainstFullServerThreadPool() throws Exception {
+        stmt.setFetchSize(1);
+
+        ResultSet rs = stmt.executeQuery("SELECT * from Integer");
+
+        rs.next();
+
+        List<Statement> statements = Collections.synchronizedList(new 
ArrayList<>());
+        List<Connection> connections = Collections.synchronizedList(new 
ArrayList<>());
+
+        // Prepares connections and statemens in order to use them for filling 
thread pool with pseuso-infine quries.
+        for (int i = 0; i < SERVER_THREAD_POOL_SIZE; i++) {
+            Connection yaConn = DriverManager.getConnection(URL);
+
+            yaConn.setSchema('"' + DEFAULT_CACHE_NAME + '"');
+
+            connections.add(yaConn);
+
+            Statement yaStmt = yaConn.createStatement();
+
+            statements.add(yaStmt);
+        }
+
+        try {
+            // Completely fills server thread pool.
+            IgniteInternalFuture<Long> fillPoolRes = 
fillServerThreadPool(statements,
+                SERVER_THREAD_POOL_SIZE - 1);
+
+            IgniteInternalFuture fetchRes = GridTestUtils.runAsync(() -> {
+                GridTestUtils.assertThrows(log, () -> {
+                    rs.next();
+
+                    return null;
+                }, SQLException.class, "The query was cancelled while 
executing.");
+            });
+
+            stmt.cancel();
+
+            // Ensures that there were no exceptions within async data 
fetching process.
+            fetchRes.get(CHECK_RESULT_TIMEOUT);
+
+            // Releases queries in thread pool.
+            TestSQLFunctions.suspendQryLatch.countDown();
+
+            // Ensure that there were no exceptions within async thread pool 
filling process.
+            fillPoolRes.get(CHECK_RESULT_TIMEOUT);
+        }
+        finally {
+            for (Statement statement : statements)
+                statement.close();
+
+            for (Connection connection : connections)
+                connection.close();
+        }
+    }
+
+    /**
+     * Trying to cancel long running file upload. No exceptions expected.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCancellingLongRunningFileUpload() throws Exception {
+        IgniteInternalFuture cancelRes = GridTestUtils.runAsync(() -> {
+            try {
+                Thread.sleep(200);
+
+                stmt.cancel();
+            }
+            catch (Exception e) {
+                log.error("Unexpected exception.", e);
+
+                fail("Unexpected exception");
+            }
+        });
+
+        GridTestUtils.assertThrows(log, () -> {
+            stmt.executeUpdate(
+                "copy from '" + BULKLOAD_20_000_LINE_CSV_FILE + "' into 
Person" +
+                    " (_key, age, firstName, lastName)" +
+                    " format csv");
+
+            return null;
+        }, SQLException.class, "The query was cancelled while executing.");
+
+        // Ensure that there were no exceptions within async cancellation 
process.
+        cancelRes.get(CHECK_RESULT_TIMEOUT);
+    }
+
+    /**
+     * Cancels current query, actual cancel will wait <code>cancelLatch</code> 
to be releaseds.
+     *
+     * @return <code>IgniteInternalFuture</code> to check whether exception 
was thrown.
+     */
+    private IgniteInternalFuture cancel(Statement stmt) {
+        return GridTestUtils.runAsync(() -> {
+            try {
+                TestSQLFunctions.cancelLatch.await();
+
+                long cancelCntrBeforeCancel = 
ClientListenerProcessor.CANCEL_COUNTER.get();
+
+                stmt.cancel();
+
+                try {
+                    GridTestUtils.waitForCondition(
+                        () -> ClientListenerProcessor.CANCEL_COUNTER.get() == 
cancelCntrBeforeCancel + 1, TIMEOUT);
+                }
+                catch (IgniteInterruptedCheckedException ignored) {
+                    // No-op.
+                }
+
+                assertEquals(cancelCntrBeforeCancel + 1, 
ClientListenerProcessor.CANCEL_COUNTER.get());
+
+                TestSQLFunctions.reqLatch.countDown();
+            }
+            catch (Exception e) {
+                log.error("Unexpected exception.", e);
+
+                fail("Unexpected exception");
+            }
+        });
+    }
+
+    /**
+     * Fills Server Thread Pool with <code>qryCnt</code> queries. Given 
queries will wait for
+     * <code>suspendQryLatch</code> to be released.
+     *
+     * @param statements Statements.
+     * @param qryCnt Number of queries to execute.
+     * @return <code>IgniteInternalFuture</code> in order to check whether 
exception was thrown or not.
+     */
+    private IgniteInternalFuture<Long> fillServerThreadPool(List<Statement> 
statements, int qryCnt) {
+        AtomicInteger idx = new AtomicInteger(0);
+
+        return GridTestUtils.runMultiThreadedAsync(() -> {
+            try {
+                statements.get(idx.getAndIncrement()).executeQuery(
+                    "select * from Integer where 
awaitQuerySuspensionLatch();");
+            }
+            catch (SQLException e) {
+                log.error("Unexpected exception.", e);
+
+                fail("Unexpected exception");
+            }
+        }, qryCnt, "ThreadName");
+    }
+
+    /**
+     * Utility class with custom SQL functions.
+     */
+    public static class TestSQLFunctions {
+        /** Request latch. */
+        static CountDownLatch reqLatch;
+
+        /** Cancel latch. */
+        static CountDownLatch cancelLatch;
+
+        /** Suspend query latch. */
+        static CountDownLatch suspendQryLatch;
+
+        /**
+         * Recreate latches.
+         */
+        static void init() {
+            reqLatch = new CountDownLatch(1);
+
+            cancelLatch = new CountDownLatch(1);
+
+            suspendQryLatch = new CountDownLatch(1);
+        }
+
+        /**
+         * Releases cancelLatch that leeds to sending cancel Query and waits 
until cancel Query is fully processed.
+         *
+         * @return 0;
+         */
+        @QuerySqlFunction
+        public static long awaitLatchCancelled() {
+            try {
+                cancelLatch.countDown();
+                reqLatch.await();
+            }
+            catch (Exception ignored) {
+                // No-op.
+            }
+
+            return 0;
+        }
+
+        /**
+         * Waits latch release.
+         *
+         * @return 0;
+         */
+        @QuerySqlFunction
+        public static long awaitQuerySuspensionLatch() {
+            try {
+                suspendQryLatch.await();
+            }
+            catch (Exception ignored) {
+                // No-op.
+            }
+
+            return 0;
+        }
+
+        /**
+         * If called fails with corresponding message.
+         *
+         * @return 0;
+         */
+        @QuerySqlFunction
+        public static long shouldNotBeCalledInCaseOfCancellation() {
+            fail("Query wasn't actually cancelled.");
+
+            return 0;
+        }
+
+        /**
+         *
+         * @param v amount of milliseconds to sleep
+         * @return amount of milliseconds to sleep
+         */
+        @QuerySqlFunction
+        public static int sleep_func(int v) {
+            try {
+                Thread.sleep(v);
+            }
+            catch (InterruptedException ignored) {
+                // No-op
+            }
+            return v;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java
index 2fe32d1..a58137b 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java
@@ -31,7 +31,6 @@ import 
org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.runner.RunWith;
@@ -1064,53 +1063,6 @@ public class JdbcThinStatementSelfTest extends 
JdbcThinAbstractSelfTest {
      * @throws Exception If failed.
      */
     @org.junit.Test
-    public void testCancel() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-5439";);
-
-        GridTestUtils.assertThrows(log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    stmt.execute("select sleep_func(3)");
-
-                    return null;
-                }
-            },
-            SQLException.class,
-            "The query is canceled");
-
-        IgniteInternalFuture f = GridTestUtils.runAsync(new Runnable() {
-            @Override public void run() {
-                try {
-                    stmt.cancel();
-                }
-                catch (SQLException e) {
-                    log.error("Unexpected exception", e);
-
-                    fail("Unexpected exception.");
-                }
-            }
-        });
-
-        f.get();
-
-        stmt.close();
-
-        GridTestUtils.assertThrows(log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    stmt.cancel();
-
-                    return null;
-                }
-            },
-            SQLException.class,
-            "Statement is closed");
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @org.junit.Test
     public void testStatementTypeMismatchSelectForCachedQuery() throws 
Exception {
         // Put query to cache.
         stmt.executeQuery("select 1;");

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ceecc43/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsAbstractComplexSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsAbstractComplexSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsAbstractComplexSelfTest.java
index ff9d6b3..d614fbc 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsAbstractComplexSelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinTransactionsAbstractComplexSelfTest.java
@@ -372,7 +372,7 @@ public abstract class 
JdbcThinTransactionsAbstractComplexSelfTest extends JdbcTh
      */
     @Test
     public void testColocatedJoinSelectAndInsertInTransaction() throws 
SQLException {
-        // We'd like to put some Google into cities with over 1K population 
which don't have it yet
+        // We'd like to put some Google into cities wgit checith over 1K 
population which don't have it yet
         executeInTransaction(new TransactionClosure() {
             @Override public void apply(Connection conn) {
                 List<Integer> ids = flat(execute(conn, "SELECT distinct 
City.id from City left join Company c on " +

Reply via email to