IGNITE-7999: JDBC Thin Driver: added unordered streaming mode. This closes #3789.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d05c28ca Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d05c28ca Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d05c28ca Branch: refs/heads/ignite-2.5 Commit: d05c28ca9850d116daedd8303e035345f15f5b82 Parents: 46dac58 Author: tledkov-gridgain <[email protected]> Authored: Fri May 11 17:55:22 2018 +0300 Committer: devozerov <[email protected]> Committed: Fri May 11 17:59:58 2018 +0300 ---------------------------------------------------------------------- .../internal/jdbc2/JdbcStreamingSelfTest.java | 3 - .../jdbc/suite/IgniteJdbcDriverTestSuite.java | 6 +- ...JdbcThinConnectionMultipleAddressesTest.java | 7 +- .../thin/JdbcThinStreamingAbstractSelfTest.java | 505 +++++++++++++++++++ .../JdbcThinStreamingNotOrderedSelfTest.java | 38 ++ .../thin/JdbcThinStreamingOrderedSelfTest.java | 39 ++ .../jdbc/thin/JdbcThinStreamingSelfTest.java | 486 ------------------ .../internal/jdbc/thin/JdbcThinConnection.java | 312 +++++++++--- .../internal/jdbc/thin/JdbcThinTcpIo.java | 63 ++- .../odbc/ClientListenerNioListener.java | 23 +- .../odbc/jdbc/JdbcBatchExecuteRequest.java | 27 + .../odbc/jdbc/JdbcBatchExecuteResult.java | 26 +- .../odbc/jdbc/JdbcConnectionContext.java | 34 +- .../jdbc/JdbcOrderedBatchExecuteRequest.java | 85 ++++ .../jdbc/JdbcOrderedBatchExecuteResult.java | 75 +++ .../processors/odbc/jdbc/JdbcRequest.java | 8 + .../odbc/jdbc/JdbcRequestHandler.java | 119 ++++- .../odbc/jdbc/JdbcResponseSender.java | 31 ++ .../processors/odbc/jdbc/JdbcResult.java | 13 + .../processors/query/SqlClientContext.java | 147 ++++-- .../apache/ignite/internal/sql/SqlKeyword.java | 3 + .../sql/command/SqlSetStreamingCommand.java | 19 + .../sql/SqlParserSetStreamingSelfTest.java | 39 +- .../processors/query/h2/IgniteH2Indexing.java | 2 +- .../config/ignite-localhost-config.xml | 2 - .../benchmark-jdbc-thin-streaming.properties | 132 +++++ .../ignite/yardstick/upload/StreamerParams.java | 7 +- .../upload/UploadBenchmarkArguments.java | 19 +- .../yardstick/upload/model/QueryFactory.java | 2 + 29 files changed, 1635 insertions(+), 637 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java index 10adedc..e302529 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java @@ -56,9 +56,6 @@ public class JdbcStreamingSelfTest extends GridCommonAbstractTest { private static final String STREAMING_URL = CFG_URL_PREFIX + "cache=person@modules/clients/src/test/config/jdbc-config.xml"; - /** */ - protected transient IgniteLogger log; - /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { return getConfiguration0(gridName); http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java index a88ebe8..a18cb45 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java @@ -70,7 +70,8 @@ import org.apache.ignite.jdbc.thin.JdbcThinResultSetSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinSchemaCaseTest; import org.apache.ignite.jdbc.thin.JdbcThinSelectAfterAlterTable; import org.apache.ignite.jdbc.thin.JdbcThinStatementSelfTest; -import org.apache.ignite.jdbc.thin.JdbcThinStreamingSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinStreamingNotOrderedSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinStreamingOrderedSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinTcpIoTest; import org.apache.ignite.jdbc.thin.JdbcThinUpdateStatementSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinUpdateStatementSkipReducerOnUpdateSelfTest; @@ -128,7 +129,8 @@ public class IgniteJdbcDriverTestSuite extends TestSuite { suite.addTest(new TestSuite(JdbcBlobTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest.class)); - suite.addTest(new TestSuite(JdbcThinStreamingSelfTest.class)); + suite.addTest(new TestSuite(JdbcThinStreamingNotOrderedSelfTest.class)); + suite.addTest(new TestSuite(JdbcThinStreamingOrderedSelfTest.class)); // DDL tests. suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexAtomicPartitionedNearSelfTest.class)); http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMultipleAddressesTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMultipleAddressesTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMultipleAddressesTest.java index 2c2aba9..e1fb295 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMultipleAddressesTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionMultipleAddressesTest.java @@ -372,9 +372,7 @@ public class JdbcThinConnectionMultipleAddressesTest extends JdbcThinAbstractSel return null; } - }, SQLException.class, "Failed to communicate with Ignite cluster"); - - assertTrue(id[0] > 0); + }, SQLException.class, "Failed to communicate with Ignite cluster on JDBC streaming"); int minId = id[0]; @@ -382,6 +380,9 @@ public class JdbcThinConnectionMultipleAddressesTest extends JdbcThinAbstractSel final Statement stmt1 = conn.createStatement(); + stmt1.execute("SET STREAMING 1 BATCH_SIZE 10 ALLOW_OVERWRITE 0 " + + " PER_NODE_BUFFER_SIZE 1000 FLUSH_FREQUENCY 1000"); + for (int i = 0; i < 10; ++i, id[0]++) stmt1.execute("INSERT INTO TEST(id, val) values (" + id[0] + ", " + id[0] + ")"); http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java new file mode 100644 index 0000000..7004635 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest; +import org.apache.ignite.internal.processors.query.GridQueryCancel; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.query.SqlClientContext; +import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; + +/** + * Tests for streaming via thin driver. + */ +public abstract class JdbcThinStreamingAbstractSelfTest extends JdbcStreamingSelfTest { + /** */ + protected int batchSize = 17; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + GridQueryProcessor.idxCls = IndexingWithContext.class; + + super.beforeTestsStarted(); + + batchSize = 17; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + // Init IndexingWithContext.cliCtx + try (Connection c = createOrdinaryConnection()) { + execute(c, "SELECT 1"); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + try (Connection c = createOrdinaryConnection()) { + execute(c, "DROP TABLE PUBLIC.T IF EXISTS"); + } + + IndexingWithContext.cliCtx = null; + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected Connection createOrdinaryConnection() throws SQLException { + return JdbcThinAbstractSelfTest.connect(grid(0), null); + } + + /** + * @throws Exception if failed. + */ + public void testStreamedBatchedInsert() throws Exception { + for (int i = 10; i <= 100; i += 10) + put(i, nameForId(i * 100)); + + try (Connection conn = createStreamedConnection(false)) { + assertStreamingState(true); + + try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?), " + + "(?, ?)")) { + for (int i = 1; i <= 100; i+= 2) { + stmt.setInt(1, i); + stmt.setString(2, nameForId(i)); + stmt.setInt(3, i + 1); + stmt.setString(4, nameForId(i + 1)); + + stmt.addBatch(); + } + + stmt.executeBatch(); + } + } + + U.sleep(500); + + // Now let's check it's all there. + for (int i = 1; i <= 100; i++) { + if (i % 10 != 0) + assertEquals(nameForId(i), nameForIdInCache(i)); + else // All that divides by 10 evenly should point to numbers 100 times greater - see above + assertEquals(nameForId(i * 100), nameForIdInCache(i)); + } + } + + /** + * @throws SQLException if failed. + */ + public void testSimultaneousStreaming() throws Exception { + try (Connection anotherConn = createOrdinaryConnection()) { + execute(anotherConn, "CREATE TABLE PUBLIC.T(x int primary key, y int) WITH " + + "\"cache_name=T,wrap_value=false\""); + } + + // Timeout to let connection close be handled on server side. + U.sleep(500); + + try (Connection conn = createStreamedConnection(false, 10000)) { + assertStreamingState(true); + + PreparedStatement firstStmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)"); + + PreparedStatement secondStmt = conn.prepareStatement("insert into PUBLIC.T(x, y) values (?, ?)"); + + try { + for (int i = 1; i <= 10; i++) { + firstStmt.setInt(1, i); + firstStmt.setString(2, nameForId(i)); + + firstStmt.executeUpdate(); + } + + for (int i = 51; i <= 67; i++) { + secondStmt.setInt(1, i); + secondStmt.setInt(2, i); + + secondStmt.executeUpdate(); + } + + for (int i = 11; i <= 50; i++) { + firstStmt.setInt(1, i); + firstStmt.setString(2, nameForId(i)); + + firstStmt.executeUpdate(); + } + + for (int i = 68; i <= 100; i++) { + secondStmt.setInt(1, i); + secondStmt.setInt(2, i); + + secondStmt.executeUpdate(); + } + + assertCacheEmpty(); + + SqlClientContext cliCtx = sqlClientContext(); + + final HashMap<String, IgniteDataStreamer<?, ?>> streamers = U.field(cliCtx, "streamers"); + + // Wait when node process requests (because client send batch requests async). + GridTestUtils.waitForCondition(() -> streamers.size() == 2, 1000); + + assertEquals(2, streamers.size()); + + assertEqualsCollections(new HashSet<>(Arrays.asList("person", "T")), streamers.keySet()); + } + finally { + U.closeQuiet(firstStmt); + + U.closeQuiet(secondStmt); + } + } + + // Let's wait a little so that all data arrives to destination - we can't intercept streamers' flush + // on connection close in any way. + U.sleep(1000); + + // Now let's check it's all there. + for (int i = 1; i <= 50; i++) + assertEquals(nameForId(i), nameForIdInCache(i)); + + for (int i = 51; i <= 100; i++) + assertEquals(i, grid(0).cache("T").get(i)); + } + + /** + * + */ + public void testStreamingWithMixedStatementTypes() throws Exception { + String prepStmtStr = "insert into Person(\"id\", \"name\") values (?, ?)"; + + String stmtStr = "insert into Person(\"id\", \"name\") values (%d, '%s')"; + + try (Connection conn = createStreamedConnection(false, 10000)) { + assertStreamingState(true); + + PreparedStatement firstStmt = conn.prepareStatement(prepStmtStr); + + Statement secondStmt = conn.createStatement(); + + try { + for (int i = 1; i <= 100; i++) { + boolean usePrep = Math.random() > 0.5; + + boolean useBatch = Math.random() > 0.5; + + if (usePrep) { + firstStmt.setInt(1, i); + firstStmt.setString(2, nameForId(i)); + + if (useBatch) + firstStmt.addBatch(); + else + firstStmt.execute(); + } + else { + String sql = String.format(stmtStr, i, nameForId(i)); + + if (useBatch) + secondStmt.addBatch(sql); + else + secondStmt.execute(sql); + } + } + } + finally { + U.closeQuiet(firstStmt); + + U.closeQuiet(secondStmt); + } + } + + // Let's wait a little so that all data arrives to destination - we can't intercept streamers' flush + // on connection close in any way. + U.sleep(1000); + + // Now let's check it's all there. + for (int i = 1; i <= 100; i++) + assertEquals(nameForId(i), nameForIdInCache(i)); + } + + /** + * @throws SQLException if failed. + */ + public void testStreamingOffToOn() throws Exception { + try (Connection conn = createOrdinaryConnection()) { + assertStreamingState(false); + + execute(conn, "SET STREAMING 1"); + + assertStreamingState(true); + } + } + + /** + * @throws SQLException if failed. + */ + public void testStreamingOffToOff() throws Exception { + try (Connection conn = createOrdinaryConnection()) { + assertStreamingState(false); + + execute(conn, "SET STREAMING 0"); + + assertStreamingState(false); + } + } + + /** + * @throws SQLException if failed. + */ + public void testStreamingOnToOff() throws Exception { + try (Connection conn = createStreamedConnection(false)) { + assertStreamingState(true); + + execute(conn, "SET STREAMING off"); + + assertStreamingState(false); + } + } + + /** + * @throws SQLException if failed. + */ + public void testFlush() throws Exception { + try (Connection conn = createStreamedConnection(false, 10000)) { + assertStreamingState(true); + + try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)")) { + for (int i = 1; i <= 100; i++) { + stmt.setInt(1, i); + stmt.setString(2, nameForId(i)); + + stmt.executeUpdate(); + } + } + + assertCacheEmpty(); + + execute(conn, "set streaming 0"); + + assertStreamingState(false); + + U.sleep(500); + + // Now let's check it's all there. + for (int i = 1; i <= 100; i++) + assertEquals(nameForId(i), nameForIdInCache(i)); + } + } + + /** + * @throws SQLException if failed. + */ + public void testStreamingReEnabled() throws Exception { + try (Connection conn = createStreamedConnection(false, 10000)) { + assertStreamingState(true); + + try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)")) { + for (int i = 1; i <= 100; i++) { + stmt.setInt(1, i); + stmt.setString(2, nameForId(i)); + + stmt.executeUpdate(); + } + } + + assertCacheEmpty(); + + execute(conn, "set streaming 1 batch_size 111 allow_overwrite 0 per_node_buffer_size 512 " + + "per_node_parallel_operations 4 flush_frequency 5000"); + + U.sleep(500); + + assertEquals((Integer)111, U.field((Object)U.field(conn, "streamState"), "streamBatchSize")); + + SqlClientContext cliCtx = sqlClientContext(); + + assertTrue(cliCtx.isStream()); + + assertFalse(U.field(cliCtx, "streamAllowOverwrite")); + + assertEquals((Integer)512, U.field(cliCtx, "streamNodeBufSize")); + + assertEquals((Long)5000L, U.field(cliCtx, "streamFlushTimeout")); + + assertEquals((Integer)4, U.field(cliCtx, "streamNodeParOps")); + + // Now let's check it's all there - SET STREAMING 1 repeated call must also have caused flush. + for (int i = 1; i <= 100; i++) + assertEquals(nameForId(i), nameForIdInCache(i)); + } + } + + /** + * + */ + @SuppressWarnings("ThrowableNotThrown") + public void testNonStreamedBatch() { + GridTestUtils.assertThrows(null, new Callable<Object>() { + @Override public Object call() throws Exception { + try (Connection conn = createOrdinaryConnection()) { + try (Statement s = conn.createStatement()) { + for (int i = 1; i <= 10; i++) + s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", i, + nameForId(i))); + + execute(conn, "SET STREAMING 1"); + + s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", 11, + nameForId(11))); + } + } + + return null; + } + }, SQLException.class, "Statement has non-empty batch (call executeBatch() or clearBatch() before " + + "enabling streaming)."); + } + + /** + * + */ + @SuppressWarnings("ThrowableNotThrown") + public void testStreamingStatementInTheMiddleOfNonPreparedBatch() { + GridTestUtils.assertThrows(null, new Callable<Object>() { + @Override public Object call() throws Exception { + try (Connection conn = createOrdinaryConnection()) { + try (Statement s = conn.createStatement()) { + s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", 1, + nameForId(1))); + + s.addBatch("SET STREAMING 1 FLUSH_FREQUENCY 10000"); + } + } + + return null; + } + }, SQLException.class, "Streaming control commands must be executed explicitly"); + } + + /** + * + */ + @SuppressWarnings("ThrowableNotThrown") + public void testBatchingSetStreamingStatement() { + GridTestUtils.assertThrows(null, new Callable<Object>() { + @Override public Object call() throws Exception { + try (Connection conn = createOrdinaryConnection()) { + try (PreparedStatement s = conn.prepareStatement("SET STREAMING 1 FLUSH_FREQUENCY 10000")) { + s.addBatch(); + } + } + + return null; + } + }, SQLException.class, "Streaming control commands must be executed explicitly"); + } + + /** + * Check that there's nothing in cache. + */ + protected void assertCacheEmpty() { + assertEquals(0, cache().size(CachePeekMode.ALL)); + } + + /** + * @param conn Connection. + * @param sql Statement. + * @throws SQLException if failed. + */ + protected static void execute(Connection conn, String sql) throws SQLException { + try (Statement s = conn.createStatement()) { + s.execute(sql); + } + } + + /** + * @return Active SQL client context. + */ + private SqlClientContext sqlClientContext() { + assertNotNull(IndexingWithContext.cliCtx); + + return IndexingWithContext.cliCtx; + } + + /** + * Check that streaming state on target node is as expected. + * + * @param on Expected streaming state. + */ + protected void assertStreamingState(boolean on) throws Exception { + SqlClientContext cliCtx = sqlClientContext(); + + GridTestUtils.waitForCondition(() -> cliCtx.isStream() == on, 1000); + + assertEquals(on, cliCtx.isStream()); + } + + /** {@inheritDoc} */ + @Override protected void assertStatementForbidden(String sql) { + batchSize = 1; + + super.assertStatementForbidden(sql); + } + + /** + * + */ + static final class IndexingWithContext extends IgniteH2Indexing { + /** Client context. */ + static SqlClientContext cliCtx; + + /** {@inheritDoc} */ + @Override public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params, + SqlClientContext cliCtx) throws IgniteCheckedException { + IndexingWithContext.cliCtx = cliCtx; + + return super.streamBatchedUpdateQuery(schemaName, qry, params, cliCtx); + } + + /** {@inheritDoc} */ + @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry, + @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, + GridQueryCancel cancel) { + IndexingWithContext.cliCtx = cliCtx; + + return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, cancel); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingNotOrderedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingNotOrderedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingNotOrderedSelfTest.java new file mode 100644 index 0000000..b91258f --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingNotOrderedSelfTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import java.sql.Connection; + +/** + * Tests for not ordered streaming via thin driver. + */ +public class JdbcThinStreamingNotOrderedSelfTest extends JdbcThinStreamingAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected Connection createStreamedConnection(boolean allowOverwrite, long flushFreq) throws Exception { + Connection c = JdbcThinAbstractSelfTest.connect(grid(0), null); + + execute(c, "SET STREAMING 1 BATCH_SIZE " + batchSize + + " ALLOW_OVERWRITE " + (allowOverwrite ? 1 : 0) + + " PER_NODE_BUFFER_SIZE 1000 " + + " FLUSH_FREQUENCY " + flushFreq + ";" + ); + + return c; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingOrderedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingOrderedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingOrderedSelfTest.java new file mode 100644 index 0000000..b615f8c --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingOrderedSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import java.sql.Connection; + +/** + * Tests for ordered streaming via thin driver. + */ +public class JdbcThinStreamingOrderedSelfTest extends JdbcThinStreamingAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected Connection createStreamedConnection(boolean allowOverwrite, long flushFreq) throws Exception { + Connection c = JdbcThinAbstractSelfTest.connect(grid(0), null); + + execute(c, "SET STREAMING 1 BATCH_SIZE " + batchSize + + " ALLOW_OVERWRITE " + (allowOverwrite ? 1 : 0) + + " PER_NODE_BUFFER_SIZE 1000 " + + " FLUSH_FREQUENCY " + flushFreq + + " ORDERED;" + ); + + return c; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java deleted file mode 100644 index 3c36f54..0000000 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java +++ /dev/null @@ -1,486 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.jdbc.thin; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.concurrent.Callable; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.cache.CachePeekMode; -import org.apache.ignite.cache.query.FieldsQueryCursor; -import org.apache.ignite.cache.query.SqlFieldsQuery; -import org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest; -import org.apache.ignite.internal.processors.query.GridQueryCancel; -import org.apache.ignite.internal.processors.query.GridQueryProcessor; -import org.apache.ignite.internal.processors.query.SqlClientContext; -import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.testframework.GridTestUtils; -import org.jetbrains.annotations.Nullable; - -/** - * Tests for streaming via thin driver. - */ -public class JdbcThinStreamingSelfTest extends JdbcStreamingSelfTest { - /** */ - private int batchSize = 17; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - GridQueryProcessor.idxCls = IndexingWithContext.class; - - super.beforeTestsStarted(); - - batchSize = 17; - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - try (Connection c = createOrdinaryConnection()) { - execute(c, "DROP TABLE PUBLIC.T IF EXISTS"); - } - - IndexingWithContext.cliCtx = null; - - super.afterTest(); - } - - /** {@inheritDoc} */ - @Override protected Connection createStreamedConnection(boolean allowOverwrite, long flushFreq) throws Exception { - Connection c = JdbcThinAbstractSelfTest.connect(grid(0), null ); - - execute(c, "SET STREAMING 1 BATCH_SIZE " + batchSize + " ALLOW_OVERWRITE " + (allowOverwrite ? 1 : 0) + - " PER_NODE_BUFFER_SIZE 1000 FLUSH_FREQUENCY " + flushFreq); - - return c; - } - - /** {@inheritDoc} */ - @Override protected Connection createOrdinaryConnection() throws SQLException { - return JdbcThinAbstractSelfTest.connect(grid(0), null); - } - - /** - * @throws Exception if failed. - */ - public void testStreamedBatchedInsert() throws Exception { - for (int i = 10; i <= 100; i += 10) - put(i, nameForId(i * 100)); - - try (Connection conn = createStreamedConnection(false)) { - assertStreamingState(true); - - try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?), " + - "(?, ?)")) { - for (int i = 1; i <= 100; i+= 2) { - stmt.setInt(1, i); - stmt.setString(2, nameForId(i)); - stmt.setInt(3, i + 1); - stmt.setString(4, nameForId(i + 1)); - - stmt.addBatch(); - } - - stmt.executeBatch(); - } - } - - U.sleep(500); - - // Now let's check it's all there. - for (int i = 1; i <= 100; i++) { - if (i % 10 != 0) - assertEquals(nameForId(i), nameForIdInCache(i)); - else // All that divides by 10 evenly should point to numbers 100 times greater - see above - assertEquals(nameForId(i * 100), nameForIdInCache(i)); - } - } - - /** - * @throws SQLException if failed. - */ - public void testSimultaneousStreaming() throws Exception { - try (Connection anotherConn = createOrdinaryConnection()) { - execute(anotherConn, "CREATE TABLE PUBLIC.T(x int primary key, y int) WITH " + - "\"cache_name=T,wrap_value=false\""); - } - - // Timeout to let connection close be handled on server side. - U.sleep(500); - - try (Connection conn = createStreamedConnection(false, 10000)) { - assertStreamingState(true); - - PreparedStatement firstStmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)"); - - PreparedStatement secondStmt = conn.prepareStatement("insert into PUBLIC.T(x, y) values (?, ?)"); - - try { - for (int i = 1; i <= 10; i++) { - firstStmt.setInt(1, i); - firstStmt.setString(2, nameForId(i)); - - firstStmt.executeUpdate(); - } - - for (int i = 51; i <= 67; i++) { - secondStmt.setInt(1, i); - secondStmt.setInt(2, i); - - secondStmt.executeUpdate(); - } - - for (int i = 11; i <= 50; i++) { - firstStmt.setInt(1, i); - firstStmt.setString(2, nameForId(i)); - - firstStmt.executeUpdate(); - } - - for (int i = 68; i <= 100; i++) { - secondStmt.setInt(1, i); - secondStmt.setInt(2, i); - - secondStmt.executeUpdate(); - } - - assertCacheEmpty(); - - SqlClientContext cliCtx = sqlClientContext(); - - HashMap<String, IgniteDataStreamer<?, ?>> streamers = U.field(cliCtx, "streamers"); - - assertEquals(2, streamers.size()); - - assertEqualsCollections(new HashSet<>(Arrays.asList("person", "T")), streamers.keySet()); - } - finally { - U.closeQuiet(firstStmt); - - U.closeQuiet(secondStmt); - } - } - - // Let's wait a little so that all data arrives to destination - we can't intercept streamers' flush - // on connection close in any way. - U.sleep(1000); - - // Now let's check it's all there. - for (int i = 1; i <= 50; i++) - assertEquals(nameForId(i), nameForIdInCache(i)); - - for (int i = 51; i <= 100; i++) - assertEquals(i, grid(0).cache("T").get(i)); - } - - /** - * - */ - public void testStreamingWithMixedStatementTypes() throws Exception { - String prepStmtStr = "insert into Person(\"id\", \"name\") values (?, ?)"; - - String stmtStr = "insert into Person(\"id\", \"name\") values (%d, '%s')"; - - try (Connection conn = createStreamedConnection(false, 10000)) { - assertStreamingState(true); - - PreparedStatement firstStmt = conn.prepareStatement(prepStmtStr); - - Statement secondStmt = conn.createStatement(); - - try { - for (int i = 1; i <= 100; i++) { - boolean usePrep = Math.random() > 0.5; - - boolean useBatch = Math.random() > 0.5; - - if (usePrep) { - firstStmt.setInt(1, i); - firstStmt.setString(2, nameForId(i)); - - if (useBatch) - firstStmt.addBatch(); - else - firstStmt.execute(); - } - else { - String sql = String.format(stmtStr, i, nameForId(i)); - - if (useBatch) - secondStmt.addBatch(sql); - else - secondStmt.execute(sql); - } - } - } - finally { - U.closeQuiet(firstStmt); - - U.closeQuiet(secondStmt); - } - } - - // Let's wait a little so that all data arrives to destination - we can't intercept streamers' flush - // on connection close in any way. - U.sleep(1000); - - // Now let's check it's all there. - for (int i = 1; i <= 100; i++) - assertEquals(nameForId(i), nameForIdInCache(i)); - } - - /** - * @throws SQLException if failed. - */ - public void testStreamingOffToOn() throws SQLException { - try (Connection conn = createOrdinaryConnection()) { - assertStreamingState(false); - - execute(conn, "SET STREAMING 1"); - - assertStreamingState(true); - } - } - - /** - * @throws SQLException if failed. - */ - public void testStreamingOnToOff() throws Exception { - try (Connection conn = createStreamedConnection(false)) { - assertStreamingState(true); - - execute(conn, "SET STREAMING off"); - - assertStreamingState(false); - } - } - - /** - * @throws SQLException if failed. - */ - public void testFlush() throws Exception { - try (Connection conn = createStreamedConnection(false, 10000)) { - assertStreamingState(true); - - try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)")) { - for (int i = 1; i <= 100; i++) { - stmt.setInt(1, i); - stmt.setString(2, nameForId(i)); - - stmt.executeUpdate(); - } - } - - assertCacheEmpty(); - - execute(conn, "set streaming 0"); - - assertStreamingState(false); - - U.sleep(500); - - // Now let's check it's all there. - for (int i = 1; i <= 100; i++) - assertEquals(nameForId(i), nameForIdInCache(i)); - } - } - - /** - * @throws SQLException if failed. - */ - public void testStreamingReEnabled() throws Exception { - try (Connection conn = createStreamedConnection(false, 10000)) { - assertStreamingState(true); - - try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)")) { - for (int i = 1; i <= 100; i++) { - stmt.setInt(1, i); - stmt.setString(2, nameForId(i)); - - stmt.executeUpdate(); - } - } - - assertCacheEmpty(); - - execute(conn, "set streaming 1 batch_size 111 allow_overwrite 0 per_node_buffer_size 512 " + - "per_node_parallel_operations 4 flush_frequency 5000"); - - U.sleep(500); - - assertEquals((Integer)111, U.field(conn, "streamBatchSize")); - - SqlClientContext cliCtx = sqlClientContext(); - - assertTrue(cliCtx.isStream()); - - assertFalse(U.field(cliCtx, "streamAllowOverwrite")); - - assertEquals((Integer)512, U.field(cliCtx, "streamNodeBufSize")); - - assertEquals((Long)5000L, U.field(cliCtx, "streamFlushTimeout")); - - assertEquals((Integer)4, U.field(cliCtx, "streamNodeParOps")); - - // Now let's check it's all there - SET STREAMING 1 repeated call must also have caused flush. - for (int i = 1; i <= 100; i++) - assertEquals(nameForId(i), nameForIdInCache(i)); - } - } - - /** - * - */ - @SuppressWarnings("ThrowableNotThrown") - public void testNonStreamedBatch() { - GridTestUtils.assertThrows(null, new Callable<Object>() { - @Override public Object call() throws Exception { - try (Connection conn = createOrdinaryConnection()) { - try (Statement s = conn.createStatement()) { - for (int i = 1; i <= 10; i++) - s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", i, - nameForId(i))); - - execute(conn, "SET STREAMING 1"); - - s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", 11, - nameForId(11))); - } - } - - return null; - } - }, SQLException.class, "Statement has non-empty batch (call executeBatch() or clearBatch() before " + - "enabling streaming)."); - } - - /** - * - */ - @SuppressWarnings("ThrowableNotThrown") - public void testStreamingStatementInTheMiddleOfNonPreparedBatch() { - GridTestUtils.assertThrows(null, new Callable<Object>() { - @Override public Object call() throws Exception { - try (Connection conn = createOrdinaryConnection()) { - try (Statement s = conn.createStatement()) { - s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", 1, - nameForId(1))); - - s.addBatch("SET STREAMING 1 FLUSH_FREQUENCY 10000"); - } - } - - return null; - } - }, SQLException.class, "Streaming control commands must be executed explicitly"); - } - - /** - * - */ - @SuppressWarnings("ThrowableNotThrown") - public void testBatchingSetStreamingStatement() { - GridTestUtils.assertThrows(null, new Callable<Object>() { - @Override public Object call() throws Exception { - try (Connection conn = createOrdinaryConnection()) { - try (PreparedStatement s = conn.prepareStatement("SET STREAMING 1 FLUSH_FREQUENCY 10000")) { - s.addBatch(); - } - } - - return null; - } - }, SQLException.class, "Streaming control commands must be executed explicitly"); - } - - /** - * Check that there's nothing in cache. - */ - private void assertCacheEmpty() { - assertEquals(0, cache().size(CachePeekMode.ALL)); - } - - /** - * @param conn Connection. - * @param sql Statement. - * @throws SQLException if failed. - */ - private static void execute(Connection conn, String sql) throws SQLException { - try (Statement s = conn.createStatement()) { - s.execute(sql); - } - } - - /** - * @return Active SQL client context. - */ - private SqlClientContext sqlClientContext() { - assertNotNull(IndexingWithContext.cliCtx); - - return IndexingWithContext.cliCtx; - } - - /** - * Check that streaming state on target node is as expected. - * @param on Expected streaming state. - */ - private void assertStreamingState(boolean on) { - SqlClientContext cliCtx = sqlClientContext(); - - assertEquals(on, cliCtx.isStream()); - } - - /** {@inheritDoc} */ - @Override protected void assertStatementForbidden(String sql) { - batchSize = 1; - - super.assertStatementForbidden(sql); - } - - /** - * - */ - private static final class IndexingWithContext extends IgniteH2Indexing { - /** Client context. */ - static SqlClientContext cliCtx; - - /** {@inheritDoc} */ - @Override public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params, - SqlClientContext cliCtx) throws IgniteCheckedException { - IndexingWithContext.cliCtx = cliCtx; - - return super.streamBatchedUpdateQuery(schemaName, qry, params, cliCtx); - } - - /** {@inheritDoc} */ - @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry, - @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, - GridQueryCancel cancel) { - IndexingWithContext.cliCtx = cliCtx; - - return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, cancel); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java index 3478124..634579b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java @@ -39,13 +39,15 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.Executor; +import java.util.concurrent.Semaphore; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; import org.apache.ignite.internal.processors.odbc.SqlStateCode; -import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest; -import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteRequest; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteResult; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest; @@ -55,6 +57,7 @@ import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.sql.command.SqlCommand; import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteProductVersion; @@ -91,7 +94,7 @@ public class JdbcThinConnection implements Connection { private boolean readOnly; /** Streaming flag. */ - private volatile boolean stream; + private volatile StreamState streamState; /** Current transaction holdability. */ private int holdability; @@ -108,15 +111,6 @@ public class JdbcThinConnection implements Connection { /** Connection properties. */ private ConnectionProperties connProps; - /** Batch size for streaming. */ - private int streamBatchSize; - - /** Batch for streaming. */ - private List<JdbcQuery> streamBatch; - - /** Last added query to recognize batches. */ - private String lastStreamQry; - /** Connected. */ private boolean connected; @@ -172,7 +166,7 @@ public class JdbcThinConnection implements Connection { * @return Whether this connection is streamed or not. */ boolean isStream() { - return stream; + return streamState != null; } /** @@ -182,24 +176,28 @@ public class JdbcThinConnection implements Connection { */ void executeNative(String sql, SqlCommand cmd) throws SQLException { if (cmd instanceof SqlSetStreamingCommand) { - // If streaming is already on, we have to disable it first. - if (stream) { - // We have to send request regardless of actual batch size. - executeBatch(true); + SqlSetStreamingCommand cmd0 = (SqlSetStreamingCommand)cmd; + + // If streaming is already on, we have to close it first. + if (streamState != null) { + streamState.close(); - stream = false; + streamState = null; } boolean newVal = ((SqlSetStreamingCommand)cmd).isTurnOn(); // Actual ON, if needed. if (newVal) { + if (!cmd0.isOrdered() && !cliIo.igniteVersion().greaterThanEqual(2, 5, 0)) { + throw new SQLException("Streaming without order doesn't supported by server [remoteNodeVer=" + + cliIo.igniteVersion() + ']', SqlStateCode.INTERNAL_ERROR); + } + sendRequest(new JdbcQueryExecuteRequest(JdbcStatementType.ANY_STATEMENT_TYPE, schema, 1, 1, sql, null)); - streamBatchSize = ((SqlSetStreamingCommand)cmd).batchSize(); - - stream = true; + streamState = new StreamState((SqlSetStreamingCommand)cmd); } } else @@ -214,39 +212,9 @@ public class JdbcThinConnection implements Connection { * @throws SQLException On error. */ void addBatch(String sql, List<Object> args) throws SQLException { - boolean newQry = (args == null || !F.eq(lastStreamQry, sql)); - - // Providing null as SQL here allows for recognizing subbatches on server and handling them more efficiently. - JdbcQuery q = new JdbcQuery(newQry ? sql : null, args != null ? args.toArray() : null); - - if (streamBatch == null) - streamBatch = new ArrayList<>(streamBatchSize); - - streamBatch.add(q); - - // Null args means "addBatch(String)" was called on non-prepared Statement, - // we don't want to remember its query string. - lastStreamQry = (args != null ? sql : null); - - if (streamBatch.size() == streamBatchSize) - executeBatch(false); - } - - /** - * @param lastBatch Whether open data streamers must be flushed and closed after this batch. - * @throws SQLException if failed. - */ - private void executeBatch(boolean lastBatch) throws SQLException { - JdbcBatchExecuteResult res = sendRequest(new JdbcBatchExecuteRequest(schema, streamBatch, lastBatch)); - - streamBatch = null; + assert isStream(); - lastStreamQry = null; - - if (res.errorCode() != ClientListenerResponse.STATUS_SUCCESS) { - throw new BatchUpdateException(res.errorMessage(), IgniteQueryErrorCode.codeToSqlState(res.errorCode()), - res.errorCode(), res.updateCounts()); - } + streamState.addBatch(sql, args); } /** {@inheritDoc} */ @@ -399,13 +367,10 @@ public class JdbcThinConnection implements Connection { if (isClosed()) return; - if (!F.isEmpty(streamBatch)) { - try { - executeBatch(true); - } - catch (SQLException e) { - LOG.log(Level.WARNING, "Exception during batch send on streamed connection close", e); - } + if (streamState != null) { + streamState.close(); + + streamState = null; } closed = true; @@ -798,6 +763,28 @@ public class JdbcThinConnection implements Connection { } /** + * Send request for execution via {@link #cliIo}. Response is waited at the separate thread + * (see {@link StreamState#asyncRespReaderThread}). + * @param req Request. + * @throws SQLException On any error. + */ + private void sendRequestNotWaitResponse(JdbcOrderedBatchExecuteRequest req) throws SQLException { + ensureConnected(); + + try { + cliIo.sendBatchRequestNoWaitResponse(req); + } + catch (SQLException e) { + throw e; + } + catch (Exception e) { + onDisconnect(); + + throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE, e); + } + } + + /** * @return Connection URL. */ public String url() { @@ -815,9 +802,11 @@ public class JdbcThinConnection implements Connection { connected = false; - streamBatch = null; + if (streamState != null) { + streamState.close0(); - lastStreamQry = null; + streamState = null; + } synchronized (stmtsMux) { for (JdbcThinStatement s : stmts) @@ -846,4 +835,203 @@ public class JdbcThinConnection implements Connection { return res; } + + /** + * Streamer state and + */ + private class StreamState { + /** Maximum requests count that may be sent before any responses. */ + private static final int MAX_REQUESTS_BEFORE_RESPONSE = 10; + + /** Wait timeout. */ + private static final long WAIT_TIMEOUT = 1; + + /** Batch size for streaming. */ + private int streamBatchSize; + + /** Batch for streaming. */ + private List<JdbcQuery> streamBatch; + + /** Last added query to recognize batches. */ + private String lastStreamQry; + + /** Keep request order on execution. */ + private long order; + + /** Async response reader thread. */ + private Thread asyncRespReaderThread; + + /** Async response error. */ + private volatile Exception err; + + /** The order of the last batch request at the stream. */ + private long lastRespOrder = -1; + + /** Last response future. */ + private final GridFutureAdapter<Void> lastRespFut = new GridFutureAdapter<>(); + + /** Response semaphore sem. */ + private Semaphore respSem = new Semaphore(MAX_REQUESTS_BEFORE_RESPONSE); + + /** + * @param cmd Stream cmd. + */ + StreamState(SqlSetStreamingCommand cmd) { + streamBatchSize = cmd.batchSize(); + + asyncRespReaderThread = new Thread(this::readResponses); + + asyncRespReaderThread.start(); + } + + /** + * Add another query for batched execution. + * @param sql Query. + * @param args Arguments. + * @throws SQLException On error. + */ + void addBatch(String sql, List<Object> args) throws SQLException { + checkError(); + + boolean newQry = (args == null || !F.eq(lastStreamQry, sql)); + + // Providing null as SQL here allows for recognizing subbatches on server and handling them more efficiently. + JdbcQuery q = new JdbcQuery(newQry ? sql : null, args != null ? args.toArray() : null); + + if (streamBatch == null) + streamBatch = new ArrayList<>(streamBatchSize); + + streamBatch.add(q); + + // Null args means "addBatch(String)" was called on non-prepared Statement, + // we don't want to remember its query string. + lastStreamQry = (args != null ? sql : null); + + if (streamBatch.size() == streamBatchSize) + executeBatch(false); + } + + /** + * @param lastBatch Whether open data streamers must be flushed and closed after this batch. + * @throws SQLException if failed. + */ + private void executeBatch(boolean lastBatch) throws SQLException { + checkError(); + + if (lastBatch) + lastRespOrder = order; + + try { + respSem.acquire(); + + sendRequestNotWaitResponse( + new JdbcOrderedBatchExecuteRequest(schema, streamBatch, lastBatch, order)); + + streamBatch = null; + + lastStreamQry = null; + + if (lastBatch) { + try { + lastRespFut.get(); + } + catch (IgniteCheckedException e) { + // No-op. + // No exceptions are expected here. + } + + checkError(); + } + else + order++; + } + catch (InterruptedException e) { + throw new SQLException("Streaming operation was interrupted", SqlStateCode.INTERNAL_ERROR, e); + } + } + + /** + * Throws at the user thread exception that was thrown at the {@link #asyncRespReaderThread} thread. + * @throws SQLException Saved exception. + */ + void checkError() throws SQLException { + if (err != null) { + Exception err0 = err; + + err = null; + + if (err0 instanceof SQLException) + throw (SQLException)err0; + else { + onDisconnect(); + + throw new SQLException("Failed to communicate with Ignite cluster on JDBC streaming.", + SqlStateCode.CONNECTION_FAILURE, err0); + } + } + } + + /** + * @throws SQLException On error. + */ + void close() throws SQLException { + close0(); + + checkError(); + } + + /** + */ + void close0() { + if (connected) { + try { + executeBatch(true); + } + catch (SQLException e) { + err = e; + + LOG.log(Level.WARNING, "Exception during batch send on streamed connection close", e); + } + } + + if (asyncRespReaderThread != null) + asyncRespReaderThread.interrupt(); + } + + /** + * + */ + void readResponses () { + try { + while (true) { + JdbcResponse resp = cliIo.readResponse(); + + if (resp.response() instanceof JdbcOrderedBatchExecuteResult) { + JdbcOrderedBatchExecuteResult res = (JdbcOrderedBatchExecuteResult)resp.response(); + + respSem.release(); + + if (res.errorCode() != ClientListenerResponse.STATUS_SUCCESS) { + err = new BatchUpdateException(res.errorMessage(), + IgniteQueryErrorCode.codeToSqlState(res.errorCode()), + res.errorCode(), res.updateCounts()); + } + + // Receive the response for the last request. + if (res.order() == lastRespOrder) { + lastRespFut.onDone(); + + break; + } + } + + if (resp.status() != ClientListenerResponse.STATUS_SUCCESS) + err = new SQLException(resp.error(), IgniteQueryErrorCode.codeToSqlState(resp.status())); + } + } + catch (Exception e) { + err = e; + } + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java index 4631e5d..44c1984 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion; import org.apache.ignite.internal.processors.odbc.ClientListenerRequest; import org.apache.ignite.internal.processors.odbc.SqlStateCode; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryCloseRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryFetchRequest; @@ -419,6 +420,44 @@ public class JdbcThinTcpIo { /** * @param req Request. + * @throws IOException In case of IO error. + * @throws SQLException On error. + */ + void sendBatchRequestNoWaitResponse(JdbcOrderedBatchExecuteRequest req) throws IOException, SQLException { + synchronized (mux) { + if (ownThread != null) { + throw new SQLException("Concurrent access to JDBC connection is not allowed" + + " [ownThread=" + ownThread.getName() + + ", curThread=" + Thread.currentThread().getName(), SqlStateCode.CONNECTION_FAILURE); + } + + ownThread = Thread.currentThread(); + } + + try { + if (!igniteVer.greaterThanEqual(2, 5, 0)) { + throw new SQLException("Streaming without response doesn't supported by server [driverProtocolVer=" + + CURRENT_VER + ", remoteNodeVer=" + igniteVer + ']', SqlStateCode.INTERNAL_ERROR); + } + + int cap = guessCapacity(req); + + BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(cap), + null, null); + + req.writeBinary(writer); + + send(writer.array()); + } + finally { + synchronized (mux) { + ownThread = null; + } + } + } + + /** + * @param req Request. * @return Server response. * @throws IOException In case of IO error. * @throws SQLException On concurrent access to JDBC connection. @@ -444,13 +483,7 @@ public class JdbcThinTcpIo { send(writer.array()); - BinaryReaderExImpl reader = new BinaryReaderExImpl(null, new BinaryHeapInputStream(read()), null, null, false); - - JdbcResponse res = new JdbcResponse(); - - res.readBinary(reader); - - return res; + return readResponse(); } finally { synchronized (mux) { @@ -460,6 +493,22 @@ public class JdbcThinTcpIo { } /** + * @return Server response. + * @throws IOException In case of IO error. + */ + @SuppressWarnings("unchecked") + JdbcResponse readResponse() throws IOException { + BinaryReaderExImpl reader = new BinaryReaderExImpl(null, new BinaryHeapInputStream(read()), null, null, false); + + JdbcResponse res = new JdbcResponse(); + + res.readBinary(reader); + + return res; + } + + + /** * Try to guess request capacity. * * @param req Request. http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java index 407c1a0..be55ab9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java @@ -159,16 +159,18 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte ClientListenerResponse resp = handler.handle(req); - if (log.isDebugEnabled()) { - long dur = (System.nanoTime() - startTime) / 1000; + if (resp != null) { + if (log.isDebugEnabled()) { + long dur = (System.nanoTime() - startTime) / 1000; - log.debug("Client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur + - ", resp=" + resp.status() + ']'); - } + log.debug("Client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur + + ", resp=" + resp.status() + ']'); + } - byte[] outMsg = parser.encode(resp); + byte[] outMsg = parser.encode(resp); - ses.send(outMsg); + ses.send(outMsg); + } } catch (Exception e) { U.error(log, "Failed to process client request [req=" + req + ']', e); @@ -216,7 +218,7 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte ClientListenerConnectionContext connCtx = null; try { - connCtx = prepareContext(clientType); + connCtx = prepareContext(ses, clientType); ensureClientPermissions(clientType); @@ -270,17 +272,18 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte /** * Prepare context. * + * @param ses Session. * @param clientType Client type. * @return Context. * @throws IgniteCheckedException If failed. */ - private ClientListenerConnectionContext prepareContext(byte clientType) throws IgniteCheckedException { + private ClientListenerConnectionContext prepareContext(GridNioSession ses, byte clientType) throws IgniteCheckedException { switch (clientType) { case ODBC_CLIENT: return new OdbcConnectionContext(ctx, busyLock, maxCursors); case JDBC_CLIENT: - return new JdbcConnectionContext(ctx, busyLock, maxCursors); + return new JdbcConnectionContext(ctx, ses, busyLock, maxCursors); case THIN_CLIENT: return new ClientConnectionContext(ctx, maxCursors); http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java index 73fd04f..bdc558c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java @@ -53,8 +53,17 @@ public class JdbcBatchExecuteRequest extends JdbcRequest { } /** + * Constructor for child requests. + * @param type Request type/ + */ + protected JdbcBatchExecuteRequest(byte type) { + super(type); + } + + /** * @param schemaName Schema name. * @param queries Queries. + * @param lastStreamBatch {@code true} in case the request is the last batch at the stream. */ public JdbcBatchExecuteRequest(String schemaName, List<JdbcQuery> queries, boolean lastStreamBatch) { super(BATCH_EXEC); @@ -67,6 +76,24 @@ public class JdbcBatchExecuteRequest extends JdbcRequest { } /** + * Constructor for child requests. + * + * @param type Request type. + * @param schemaName Schema name. + * @param queries Queries. + * @param lastStreamBatch {@code true} in case the request is the last batch at the stream. + */ + protected JdbcBatchExecuteRequest(byte type, String schemaName, List<JdbcQuery> queries, boolean lastStreamBatch) { + super(type); + + assert lastStreamBatch || !F.isEmpty(queries); + + this.schemaName = schemaName; + this.queries = queries; + this.lastStreamBatch = lastStreamBatch; + } + + /** * @return Schema name. */ @Nullable public String schemaName() { http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java index 917e60a..3fc9dd7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java @@ -36,18 +36,26 @@ public class JdbcBatchExecuteResult extends JdbcResult { private String errMsg; /** - * Condtructor. + * Constructor. */ - public JdbcBatchExecuteResult() { + JdbcBatchExecuteResult() { super(BATCH_EXEC); } /** + * Constructor for child results. + * @param type Result type. + */ + JdbcBatchExecuteResult(byte type) { + super(type); + } + + /** * @param updateCnts Update counts for batch. * @param errCode Error code. * @param errMsg Error message. */ - public JdbcBatchExecuteResult(int [] updateCnts, int errCode, String errMsg) { + JdbcBatchExecuteResult(int [] updateCnts, int errCode, String errMsg) { super(BATCH_EXEC); this.updateCnts = updateCnts; @@ -56,6 +64,18 @@ public class JdbcBatchExecuteResult extends JdbcResult { } /** + * @param type Result type. + * @param res Result. + */ + JdbcBatchExecuteResult(byte type, JdbcBatchExecuteResult res) { + super(type); + + this.updateCnts = res.updateCnts; + this.errCode = res.errCode; + this.errMsg = res.errMsg; + } + + /** * @return Update count for DML queries. */ public int[] updateCounts() { http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java index 2fe3b9c..272c2f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java @@ -17,10 +17,10 @@ package org.apache.ignite.internal.processors.odbc.jdbc; -import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.binary.BinaryReaderExImpl; import org.apache.ignite.internal.processors.authentication.AuthorizationContext; @@ -28,7 +28,9 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerConnectionContex import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser; import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion; import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler; +import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.nio.GridNioSession; import org.apache.ignite.internal.util.typedef.F; /** @@ -59,9 +61,15 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext { /** Context. */ private final GridKernalContext ctx; + /** Session. */ + private final GridNioSession ses; + /** Shutdown busy lock. */ private final GridSpinBusyLock busyLock; + /** Logger. */ + private final IgniteLogger log; + /** Maximum allowed cursors. */ private final int maxCursors; @@ -83,13 +91,17 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext { /** * Constructor. * @param ctx Kernal Context. + * @param ses Session. * @param busyLock Shutdown busy lock. * @param maxCursors Maximum allowed cursors. */ - public JdbcConnectionContext(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors) { + public JdbcConnectionContext(GridKernalContext ctx, GridNioSession ses, GridSpinBusyLock busyLock, int maxCursors) { this.ctx = ctx; + this.ses = ses; this.busyLock = busyLock; this.maxCursors = maxCursors; + + log = ctx.log(getClass()); } /** {@inheritDoc} */ @@ -146,11 +158,23 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext { catch (Exception e) { throw new IgniteCheckedException("Handshake error: " + e.getMessage(), e); } + parser = new JdbcMessageParser(ctx); - handler = new JdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, enforceJoinOrder, - collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, actx, ver); + JdbcResponseSender sender = new JdbcResponseSender() { + @Override public void send(ClientListenerResponse resp) { + if (resp != null) { + if (log.isDebugEnabled()) + log.debug("Async response: [resp=" + resp.status() + ']'); - parser = new JdbcMessageParser(ctx); + byte[] outMsg = parser.encode(resp); + + ses.send(outMsg); + } + } + }; + + handler = new JdbcRequestHandler(ctx, busyLock, sender, maxCursors, distributedJoins, enforceJoinOrder, + collocated, replicatedOnly, autoCloseCursors, lazyExec, skipReducerOnUpdate, actx, ver); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteRequest.java new file mode 100644 index 0000000..3e84731 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteRequest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import java.util.List; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.NotNull; + +/** + * JDBC batch execute ordered request. + */ +public class JdbcOrderedBatchExecuteRequest extends JdbcBatchExecuteRequest + implements Comparable<JdbcOrderedBatchExecuteRequest> { + /** Order. */ + private long order; + + /** + * Default constructor. + */ + public JdbcOrderedBatchExecuteRequest() { + super(BATCH_EXEC_ORDERED); + } + + /** + * @param schemaName Schema name. + * @param queries Queries. + * @param lastStreamBatch {@code true} in case the request is the last batch at the stream. + * @param order Request order. + */ + public JdbcOrderedBatchExecuteRequest(String schemaName, List<JdbcQuery> queries, + boolean lastStreamBatch, long order) { + super(BATCH_EXEC_ORDERED, schemaName, queries, lastStreamBatch); + + this.order = order; + } + + /** + * @return Request order. + */ + public long order() { + return order; + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException { + super.writeBinary(writer); + + writer.writeLong(order); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException { + super.readBinary(reader); + + order = reader.readLong(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(JdbcOrderedBatchExecuteRequest.class, this); + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull JdbcOrderedBatchExecuteRequest o) { + return Long.compare(order, o.order); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteResult.java new file mode 100644 index 0000000..84853d4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcOrderedBatchExecuteResult.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * JDBC batch execute ordered result. + */ +public class JdbcOrderedBatchExecuteResult extends JdbcBatchExecuteResult { + /** Order. */ + private long order; + + /** + * Constructor. + */ + public JdbcOrderedBatchExecuteResult() { + super(BATCH_EXEC_ORDERED); + } + + /** + * @param res Result. + * @param order Order. + */ + public JdbcOrderedBatchExecuteResult(JdbcBatchExecuteResult res, long order) { + super(BATCH_EXEC_ORDERED, res); + + this.order = order; + } + + /** + * @return Order. + */ + public long order() { + return order; + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException { + super.writeBinary(writer); + + writer.writeLong(order); + } + + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException { + super.readBinary(reader); + + order = reader.readLong(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(JdbcOrderedBatchExecuteResult.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java index 22522ad..3d5b869 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java @@ -63,6 +63,9 @@ public class JdbcRequest extends ClientListenerRequestNoId implements JdbcRawBin /** Send a batch of a data from client to server. */ static final byte BULK_LOAD_BATCH = 13; + /** Ordered batch request. */ + static final byte BATCH_EXEC_ORDERED = 14; + /** Request type. */ private byte type; @@ -161,6 +164,11 @@ public class JdbcRequest extends ClientListenerRequestNoId implements JdbcRawBin break; + case BATCH_EXEC_ORDERED: + req = new JdbcOrderedBatchExecuteRequest(); + + break; + default: throw new IgniteException("Unknown SQL listener request ID: [request ID=" + reqType + ']'); }
