IGNITE-5397: JDBC thin driver: avoid sending JdbcQueryCloseRequest message whenever possible. This closes #2095.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ecba1acc Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ecba1acc Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ecba1acc Branch: refs/heads/ignite-5272 Commit: ecba1acc80ec395d312d7e70be53fdafb3ff6fc5 Parents: 5120a24 Author: Sergey Kalashnikov <[email protected]> Authored: Wed Jun 7 15:29:28 2017 +0300 Committer: devozerov <[email protected]> Committed: Wed Jun 7 15:29:28 2017 +0300 ---------------------------------------------------------------------- .../jdbc/suite/IgniteJdbcDriverTestSuite.java | 2 + .../thin/JdbcThinAutoCloseServerCursorTest.java | 359 +++++++++++++++++++ .../jdbc/thin/JdbcThinConnectionSelfTest.java | 36 ++ .../internal/jdbc/thin/JdbcThinConnection.java | 4 +- .../internal/jdbc/thin/JdbcThinResultSet.java | 18 +- .../internal/jdbc/thin/JdbcThinStatement.java | 2 +- .../internal/jdbc/thin/JdbcThinTcpIo.java | 17 +- .../internal/jdbc/thin/JdbcThinUtils.java | 7 +- .../processors/odbc/SqlListenerNioListener.java | 3 +- .../processors/odbc/jdbc/JdbcQueryCursor.java | 9 +- .../odbc/jdbc/JdbcRequestHandler.java | 21 +- 11 files changed, 462 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java index 121b8df..9ca3582 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java @@ -31,6 +31,7 @@ import org.apache.ignite.jdbc.JdbcPojoQuerySelfTest; import org.apache.ignite.jdbc.JdbcPreparedStatementSelfTest; import org.apache.ignite.jdbc.JdbcResultSetSelfTest; import org.apache.ignite.jdbc.JdbcStatementSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinAutoCloseServerCursorTest; import org.apache.ignite.jdbc.thin.JdbcThinComplexQuerySelfTest; import org.apache.ignite.jdbc.thin.JdbcThinDeleteStatementSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinDynamicIndexAtomicPartitionedNearSelfTest; @@ -116,6 +117,7 @@ public class IgniteJdbcDriverTestSuite extends TestSuite { suite.addTest(new TestSuite(JdbcThinUpdateStatementSelfTest.class)); suite.addTest(new TestSuite(JdbcThinMergeStatementSelfTest.class)); suite.addTest(new TestSuite(JdbcThinDeleteStatementSelfTest.class)); + suite.addTest(new TestSuite(JdbcThinAutoCloseServerCursorTest.class)); // New thin JDBC driver, DDL tests suite.addTest(new TestSuite(JdbcThinDynamicIndexAtomicPartitionedNearSelfTest.class)); http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAutoCloseServerCursorTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAutoCloseServerCursorTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAutoCloseServerCursorTest.java new file mode 100644 index 0000000..eff504b --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAutoCloseServerCursorTest.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.jdbc.thin.JdbcThinUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Tests an optional optimization that server cursor is closed automatically + * when last result set page is transmitted. + */ +public class JdbcThinAutoCloseServerCursorTest extends JdbcThinAbstractSelfTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** URL. */ + private static final String URL = "jdbc:ignite:thin://127.0.0.1/?" + + JdbcThinUtils.PARAM_AUTO_CLOSE_SERVER_CURSOR + "=true"; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + CacheConfiguration cache = defaultCacheConfiguration(); + + cache.setName(CACHE_NAME); + cache.setCacheMode(PARTITIONED); + cache.setBackups(1); + cache.setWriteSynchronizationMode(FULL_SYNC); + cache.setIndexedTypes(Integer.class, Person.class); + + cfg.setCacheConfiguration(cache); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(3); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + IgniteCache<Integer, Person> cache = grid(0).cache(CACHE_NAME); + + cache.clear(); + } + + /** + * Ensure that server cursor is implicitly closed on last page. + * + * @throws Exception If failed. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testQuery() throws Exception { + IgniteCache<Integer, Person> cache = grid(0).cache(CACHE_NAME); + + Person persons[] = new Person[] { + new Person(1, "John", 25), + new Person(2, "Mary", 23) + }; + + for (Person person: persons) + cache.put(person.id, person); + + try (Connection conn = DriverManager.getConnection(URL)) { + conn.setSchema(CACHE_NAME); + + String sqlText = "select * from Person"; + + try (Statement stmt = conn.createStatement()) { + // Ensure that result set fits into one page + stmt.setFetchSize(2); + + try (ResultSet rs = stmt.executeQuery(sqlText)) { + // Attempt to get query metadata when server cursor is already closed + GridTestUtils.assertThrows(log(), new Callable<Object>() { + @Override public Object call() throws Exception { + return rs.getMetaData(); + } + }, + SQLException.class, + "Server cursor is already closed" + ); + + GridTestUtils.assertThrows(log(), new Callable<Object>() { + @Override public Object call() throws Exception { + return rs.findColumn("id"); + } + }, + SQLException.class, + "Server cursor is already closed" + ); + + checkResultSet(rs, persons); + } + } + + try (Statement stmt = conn.createStatement()) { + // Ensure multiple page result set + stmt.setFetchSize(1); + + try (ResultSet rs = stmt.executeQuery(sqlText)) { + // Getting result set metadata is OK here + assertEquals(3, rs.getMetaData().getColumnCount()); + + assertEquals(1, rs.findColumn("id")); + + checkResultSet(rs, persons); + } + } + + try (Statement stmt = conn.createStatement()) { + // Ensure multiple page result set + stmt.setFetchSize(1); + + try (ResultSet rs = stmt.executeQuery(sqlText)) { + checkResultSet(rs, persons); + + // Server cursor is closed now + GridTestUtils.assertThrows(log(), new Callable<Object>() { + @Override public Object call() throws Exception { + return rs.getMetaData(); + } + }, + SQLException.class, + "Server cursor is already closed" + ); + + GridTestUtils.assertThrows(log(), new Callable<Object>() { + @Override public Object call() throws Exception { + return rs.findColumn("id"); + } + }, + SQLException.class, + "Server cursor is already closed" + ); + } + } + } + } + + /** + * Ensure that insert works when auto close of server cursor is enabled. + * + * @throws Exception If failed. + */ + public void testInsert() throws Exception { + try (Connection conn = DriverManager.getConnection(URL)) { + conn.setSchema(CACHE_NAME); + + String sqlText = "insert into Person (_key, id, name, age) values (?, ?, ?, ?)"; + + Person p = new Person(1, "John", 25); + + try (PreparedStatement prepared = conn.prepareStatement(sqlText)) { + prepared.setInt(1, p.id); + prepared.setInt(2, p.id); + prepared.setString(3, p.name); + prepared.setInt(4, p.age); + + assertFalse(prepared.execute()); + assertEquals(1, prepared.getUpdateCount()); + } + + IgniteCache<Integer, Person> cache = grid(0).cache(CACHE_NAME); + + assertEquals(p, cache.get(1)); + } + } + + /** + * Ensure that update works when auto close of server cursor is enabled. + * + * @throws Exception If failed. + */ + public void testUpdate() throws Exception { + IgniteCache<Integer, Person> cache = grid(0).cache(CACHE_NAME); + + Person p = new Person(1, "John", 25); + + cache.put(1, p); + + try (Connection conn = DriverManager.getConnection(URL)) { + conn.setSchema(CACHE_NAME); + + String sqlText = "update Person set age = age + 1"; + + try (Statement stmt = conn.createStatement()) { + assertEquals(1, stmt.executeUpdate(sqlText)); + } + + assertEquals(p.age + 1, cache.get(1).age); + } + } + + /** + * Ensure that delete works when auto close of server cursor is enabled. + * + * @throws Exception If failed. + */ + public void testDelete() throws Exception { + IgniteCache<Integer, Person> cache = grid(0).cache(CACHE_NAME); + + Person p = new Person(1, "John", 25); + + cache.put(1, p); + + try (Connection conn = DriverManager.getConnection(URL)) { + conn.setSchema(CACHE_NAME); + + String sqlText = "delete Person where age = ?"; + + try (PreparedStatement prepared = conn.prepareStatement(sqlText)) { + prepared.setInt(1, p.age); + + assertEquals(1, prepared.executeUpdate()); + } + + assertNull(cache.get(1)); + } + } + + /** + * Checks result set against array of Person. + * + * @param rs Result set. + * @param persons Array of Person. + * @throws Exception If failed. + */ + private void checkResultSet(ResultSet rs, Person[] persons) throws Exception { + while (rs.next()) { + Person p = new Person( + rs.getInt(1), + rs.getString(2), + rs.getInt(3)); + + assert p.id > 0 && p.id <= persons.length; + + assertEquals(persons[p.id - 1], p); + } + } + + /** + * Person. + */ + @SuppressWarnings("UnusedDeclaration") + static class Person implements Serializable { + /** ID. */ + @QuerySqlField + private final int id; + + /** First name. */ + @QuerySqlField + private final String name; + + /** Last name. */ + @QuerySqlField + private final int age; + + /** + * @param id ID. + * @param name Name. + * @param age Age. + */ + Person(int id, String name, int age) { + assert !F.isEmpty(name); + assert age > 0; + + this.id = id; + this.name = name; + this.age = age; + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Person person = (Person) o; + + if (id != person.id) + return false; + + if (name == null ^ person.name == null) + return false; + + if (name != null && !name.equals(person.name)) + return false; + + return age == person.age; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = id; + + result = 31 * result + (name != null ? name.hashCode() : 0); + result = 31 * result + age; + + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java index 7ea22d5..8f1285b 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java @@ -237,6 +237,42 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest { } /** + * Test autoCloseServerCursor property handling. + * + * @throws Exception If failed. + */ + public void testAutoCloseServerCursorProperty() throws Exception { + String url = "jdbc:ignite:thin://127.0.0.1?" + JdbcThinUtils.PARAM_AUTO_CLOSE_SERVER_CURSOR; + + String err = "Failed to parse boolean property [name=" + JdbcThinUtils.PARAM_AUTO_CLOSE_SERVER_CURSOR; + + assertInvalid(url + "=0", err); + assertInvalid(url + "=1", err); + assertInvalid(url + "=false1", err); + assertInvalid(url + "=true1", err); + + try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) { + assertFalse(io(conn).autoCloseServerCursor()); + } + + try (Connection conn = DriverManager.getConnection(url + "=true")) { + assertTrue(io(conn).autoCloseServerCursor()); + } + + try (Connection conn = DriverManager.getConnection(url + "=True")) { + assertTrue(io(conn).autoCloseServerCursor()); + } + + try (Connection conn = DriverManager.getConnection(url + "=false")) { + assertFalse(io(conn).autoCloseServerCursor()); + } + + try (Connection conn = DriverManager.getConnection(url + "=False")) { + assertFalse(io(conn).autoCloseServerCursor()); + } + } + + /** * Get client socket for connection. * * @param conn Connection. http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java index b372085..14c34ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java @@ -44,6 +44,7 @@ import static java.sql.ResultSet.CONCUR_READ_ONLY; import static java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT; import static java.sql.ResultSet.TYPE_FORWARD_ONLY; +import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_AUTO_CLOSE_SERVER_CURSORS; import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_HOST; import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_PORT; import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_DISTRIBUTED_JOINS; @@ -106,6 +107,7 @@ public class JdbcThinConnection implements Connection { boolean enforceJoinOrder = extractBoolean(props, PROP_ENFORCE_JOIN_ORDER, false); boolean collocated = extractBoolean(props, PROP_COLLOCATED, false); boolean replicatedOnly = extractBoolean(props, PROP_REPLICATED_ONLY, false); + boolean autoCloseServerCursor = extractBoolean(props, PROP_AUTO_CLOSE_SERVER_CURSORS, false); int sockSndBuf = extractIntNonNegative(props, PROP_SOCK_SND_BUF, 0); int sockRcvBuf = extractIntNonNegative(props, PROP_SOCK_RCV_BUF, 0); @@ -114,7 +116,7 @@ public class JdbcThinConnection implements Connection { try { cliIo = new JdbcThinTcpIo(host, port, distributedJoins, enforceJoinOrder, collocated, replicatedOnly, - sockSndBuf, sockRcvBuf, tcpNoDelay); + autoCloseServerCursor, sockSndBuf, sockRcvBuf, tcpNoDelay); cliIo.start(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java index 87bc526..5c61e23 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java @@ -94,6 +94,9 @@ public class JdbcThinResultSet implements ResultSet { /** Is query flag. */ private boolean isQuery; + /** Auto close server cursors flag. */ + private boolean autoClose; + /** Update count. */ private long updCnt; @@ -105,11 +108,13 @@ public class JdbcThinResultSet implements ResultSet { * @param fetchSize Fetch size. * @param finished Finished flag. * @param rows Rows. - * @param isQuery Is Result ser for Select query + * @param isQuery Is Result ser for Select query. + * @param autoClose Is automatic close of server cursors enabled. + * @param updCnt Update count. */ @SuppressWarnings("OverlyStrongTypeCast") JdbcThinResultSet(JdbcThinStatement stmt, long qryId, int fetchSize, boolean finished, - List<List<Object>> rows, boolean isQuery, long updCnt) { + List<List<Object>> rows, boolean isQuery, boolean autoClose, long updCnt) { assert stmt != null; assert fetchSize > 0; @@ -118,6 +123,7 @@ public class JdbcThinResultSet implements ResultSet { this.fetchSize = fetchSize; this.finished = finished; this.isQuery = isQuery; + this.autoClose = autoClose; if (isQuery) { this.fetchSize = fetchSize; @@ -134,7 +140,7 @@ public class JdbcThinResultSet implements ResultSet { @Override public boolean next() throws SQLException { ensureNotClosed(); - if (rowsIter == null && !finished) { + if ((rowsIter == null || !rowsIter.hasNext()) && !finished) { try { JdbcQueryFetchResult res = stmt.connection().io().queryFetch(qryId, fetchSize); @@ -178,7 +184,8 @@ public class JdbcThinResultSet implements ResultSet { return; try { - stmt.connection().io().queryClose(qryId); + if (!finished || (isQuery && !autoClose)) + stmt.connection().io().queryClose(qryId); closed = true; } @@ -1616,6 +1623,9 @@ public class JdbcThinResultSet implements ResultSet { * @throws SQLException On error. */ private List<JdbcColumnMeta> meta() throws SQLException { + if (finished && (!isQuery || autoClose)) + throw new SQLException("Server cursor is already closed."); + if (!metaInit) { try { JdbcQueryMetadataResult res = stmt.connection().io().queryMeta(qryId); http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java index a0b7ee6..2cad223 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java @@ -112,7 +112,7 @@ public class JdbcThinStatement implements Statement { assert res != null; rs = new JdbcThinResultSet(this, res.getQueryId(), pageSize, res.last(), res.items(), - res.isQuery(), res.updateCount()); + res.isQuery(), conn.io().autoCloseServerCursor(), res.updateCount()); } catch (IOException e) { conn.close(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java index 1905ea4..be62a8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java @@ -53,7 +53,7 @@ public class JdbcThinTcpIo { private static final SqlListenerProtocolVersion CURRENT_VER = SqlListenerProtocolVersion.create(2, 1, 0); /** Initial output stream capacity for handshake. */ - private static final int HANDSHAKE_MSG_SIZE = 12; + private static final int HANDSHAKE_MSG_SIZE = 13; /** Initial output for query message. */ private static final int QUERY_EXEC_MSG_INIT_CAP = 256; @@ -85,6 +85,9 @@ public class JdbcThinTcpIo { /** Replicated only flag. */ private final boolean replicatedOnly; + /** Flag to automatically close server cursor. */ + private final boolean autoCloseServerCursor; + /** Socket send buffer. */ private final int sockSndBuf; @@ -115,18 +118,20 @@ public class JdbcThinTcpIo { * @param enforceJoinOrder Enforce join order flag. * @param collocated Collocated flag. * @param replicatedOnly Replicated only flag. + * @param autoCloseServerCursor Flag to automatically close server cursors. * @param sockSndBuf Socket send buffer. * @param sockRcvBuf Socket receive buffer. * @param tcpNoDelay TCP no delay flag. */ JdbcThinTcpIo(String host, int port, boolean distributedJoins, boolean enforceJoinOrder, boolean collocated, - boolean replicatedOnly, int sockSndBuf, int sockRcvBuf, boolean tcpNoDelay) { + boolean replicatedOnly, boolean autoCloseServerCursor, int sockSndBuf, int sockRcvBuf, boolean tcpNoDelay) { this.host = host; this.port = port; this.distributedJoins = distributedJoins; this.enforceJoinOrder = enforceJoinOrder; this.collocated = collocated; this.replicatedOnly = replicatedOnly; + this.autoCloseServerCursor = autoCloseServerCursor; this.sockSndBuf = sockSndBuf; this.sockRcvBuf = sockRcvBuf; this.tcpNoDelay = tcpNoDelay; @@ -182,6 +187,7 @@ public class JdbcThinTcpIo { writer.writeBoolean(enforceJoinOrder); writer.writeBoolean(collocated); writer.writeBoolean(replicatedOnly); + writer.writeBoolean(autoCloseServerCursor); send(writer.array()); @@ -382,6 +388,13 @@ public class JdbcThinTcpIo { } /** + * @return Auto close server cursors flag. + */ + public boolean autoCloseServerCursor() { + return autoCloseServerCursor; + } + + /** * @return Socket send buffer size. */ public int socketSendBuffer() { http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java index aa9b011..78e8c8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.jdbc.thin; -import org.apache.ignite.configuration.OdbcConfiguration; import org.apache.ignite.configuration.SqlConnectorConfiguration; import java.sql.Time; @@ -76,6 +75,9 @@ public class JdbcThinUtils { /** Parameter: TCP no-delay flag. */ public static final String PARAM_TCP_NO_DELAY = "tcpNoDelay"; + /** Parameter: Automatically close server cursor. */ + public static final String PARAM_AUTO_CLOSE_SERVER_CURSOR = "autoCloseServerCursor"; + /** Distributed joins property name. */ public static final String PROP_DISTRIBUTED_JOINS = PROP_PREFIX + PARAM_DISTRIBUTED_JOINS; @@ -97,6 +99,9 @@ public class JdbcThinUtils { /** TCP no delay property name. */ public static final String PROP_TCP_NO_DELAY = PROP_PREFIX + PARAM_TCP_NO_DELAY; + /** Automatically close server cursor. */ + public static final String PROP_AUTO_CLOSE_SERVER_CURSORS = PROP_PREFIX + PARAM_AUTO_CLOSE_SERVER_CURSOR; + /** Default port. */ public static final int DFLT_PORT = SqlConnectorConfiguration.DFLT_PORT; http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java index bff5519..6bb4e29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java @@ -252,9 +252,10 @@ public class SqlListenerNioListener extends GridNioServerListenerAdapter<byte[]> boolean enforceJoinOrder = reader.readBoolean(); boolean collocated = reader.readBoolean(); boolean replicatedOnly = reader.readBoolean(); + boolean autoCloseCursors = reader.readBoolean(); SqlListenerRequestHandler handler = new JdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, - enforceJoinOrder, collocated, replicatedOnly); + enforceJoinOrder, collocated, replicatedOnly, autoCloseCursors); SqlListenerMessageParser parser = new JdbcMessageParser(ctx); http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java index b8edb8d..830daea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java @@ -65,13 +65,16 @@ class JdbcQueryCursor { * @return List of the rows. */ List<List<Object>> fetchRows() { - List<List<Object>> items = new ArrayList<>(); + int fetchSize = (maxRows > 0) ? (int)Math.min(pageSize, maxRows - fetched) : pageSize; - int fetchSize0 = (maxRows > 0) ? (int)Math.min(pageSize, maxRows - fetched) : pageSize; + List<List<Object>> items = new ArrayList<>(fetchSize); - for (; fetched < fetchSize0 && iter.hasNext(); ++fetched) + for (int i = 0; i < fetchSize && iter.hasNext(); i++) { items.add(iter.next()); + fetched++; + } + return items; } http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java index 0796cfc..94ac433 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java @@ -74,6 +74,9 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler { /** Replicated only flag. */ private final boolean replicatedOnly; + /** Automatic close of cursors. */ + private final boolean autoCloseCursors; + /** * Constructor. * @@ -84,9 +87,11 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler { * @param enforceJoinOrder Enforce join order flag. * @param collocated Collocated flag. * @param replicatedOnly Replicated only flag. + * @param autoCloseCursors Flag to automatically close server cursors. */ public JdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors, - boolean distributedJoins, boolean enforceJoinOrder, boolean collocated, boolean replicatedOnly) { + boolean distributedJoins, boolean enforceJoinOrder, boolean collocated, boolean replicatedOnly, + boolean autoCloseCursors) { this.ctx = ctx; this.busyLock = busyLock; this.maxCursors = maxCursors; @@ -94,6 +99,7 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler { this.enforceJoinOrder = enforceJoinOrder; this.collocated = collocated; this.replicatedOnly = replicatedOnly; + this.autoCloseCursors = autoCloseCursors; log = ctx.log(getClass()); } @@ -183,8 +189,6 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler { JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(), (QueryCursorImpl)qryCur); - qryCursors.put(qryId, cur); - JdbcQueryExecuteResult res; if (cur.isQuery()) @@ -200,6 +204,11 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler { res = new JdbcQueryExecuteResult(qryId, (Long)items.get(0).get(0)); } + if (res.last() && (!res.isQuery() || autoCloseCursors)) + cur.close(); + else + qryCursors.put(qryId, cur); + return new JdbcResponse(res); } catch (Exception e) { @@ -260,6 +269,12 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler { JdbcQueryFetchResult res = new JdbcQueryFetchResult(cur.fetchRows(), !cur.hasNext()); + if (res.last() && (!cur.isQuery() || autoCloseCursors)) { + qryCursors.remove(req.queryId()); + + cur.close(); + } + return new JdbcResponse(res); } catch (Exception e) {
