IGNITE-5126: Batch support for this JDBC driver. This closes #2162.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1a7354fa Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1a7354fa Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1a7354fa Branch: refs/heads/ignite-5872 Commit: 1a7354fa6b45c3c33d3cf3f8a9f4b56bfbf5f507 Parents: e8b355f Author: tledkov-gridgain <tled...@gridgain.com> Authored: Fri Aug 4 11:46:14 2017 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Fri Aug 4 11:46:14 2017 +0300 ---------------------------------------------------------------------- .../jdbc/suite/IgniteJdbcDriverTestSuite.java | 2 + .../ignite/jdbc/thin/JdbcThinBatchSelfTest.java | 333 +++++++++++++++++++ .../jdbc/thin/JdbcThinPreparedStatement.java | 16 +- .../internal/jdbc/thin/JdbcThinStatement.java | 46 ++- .../internal/jdbc/thin/JdbcThinTcpIo.java | 20 ++ .../odbc/jdbc/JdbcBatchExecuteRequest.java | 109 ++++++ .../odbc/jdbc/JdbcBatchExecuteResult.java | 96 ++++++ .../processors/odbc/jdbc/JdbcQuery.java | 95 ++++++ .../processors/odbc/jdbc/JdbcRequest.java | 8 + .../odbc/jdbc/JdbcRequestHandler.java | 66 +++- .../processors/odbc/jdbc/JdbcResult.java | 11 + 11 files changed, 794 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1a7354fa/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java index 8ca3d45..cf7ee8f 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java @@ -33,6 +33,7 @@ import org.apache.ignite.jdbc.JdbcPreparedStatementSelfTest; import org.apache.ignite.jdbc.JdbcResultSetSelfTest; import org.apache.ignite.jdbc.JdbcStatementSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinAutoCloseServerCursorTest; +import org.apache.ignite.jdbc.thin.JdbcThinBatchSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinComplexQuerySelfTest; import org.apache.ignite.jdbc.thin.JdbcThinConnectionSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinDeleteStatementSelfTest; @@ -121,6 +122,7 @@ public class IgniteJdbcDriverTestSuite extends TestSuite { suite.addTest(new TestSuite(JdbcThinMergeStatementSelfTest.class)); suite.addTest(new TestSuite(JdbcThinDeleteStatementSelfTest.class)); suite.addTest(new TestSuite(JdbcThinAutoCloseServerCursorTest.class)); + suite.addTest(new TestSuite(JdbcThinBatchSelfTest.class)); // New thin JDBC driver, DDL tests suite.addTest(new TestSuite(JdbcThinDynamicIndexAtomicPartitionedNearSelfTest.class)); http://git-wip-us.apache.org/repos/asf/ignite/blob/1a7354fa/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java new file mode 100644 index 0000000..5781e00 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import java.sql.BatchUpdateException; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.Callable; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * Statement test. + */ +public class JdbcThinBatchSelfTest extends JdbcThinAbstractDmlStatementSelfTest { + /** SQL query. */ + private static final String SQL_PREPARED = "insert into Person(_key, id, firstName, lastName, age) values " + + "(?, ?, ?, ?, ?)"; + + /** Statement. */ + private Statement stmt; + + /** Prepared statement. */ + private PreparedStatement pstmt; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stmt = conn.createStatement(); + + pstmt = conn.prepareStatement(SQL_PREPARED); + + assertNotNull(stmt); + assertFalse(stmt.isClosed()); + + assertNotNull(pstmt); + assertFalse(pstmt.isClosed()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + if (stmt != null && !stmt.isClosed()) + stmt.close(); + + if (pstmt != null && !pstmt.isClosed()) + pstmt.close(); + + assertTrue(pstmt.isClosed()); + assertTrue(stmt.isClosed()); + + super.afterTest(); + } + + /** + * @throws SQLException If failed. + */ + public void testBatch() throws SQLException { + final int BATCH_SIZE = 10; + + for (int idx = 0, i = 0; i < BATCH_SIZE; ++i, idx += i) { + stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values " + + generateValues(idx, i + 1)); + } + + int [] updCnts = stmt.executeBatch(); + + assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length); + + for (int i = 0; i < BATCH_SIZE; ++i) + assertEquals("Invalid update count",i + 1, updCnts[i]); + } + + /** + * @throws SQLException If failed. + */ + public void testBatchOnClosedStatement() throws SQLException { + final Statement stmt2 = conn.createStatement(); + final PreparedStatement pstmt2 = conn.prepareStatement(""); + + stmt2.close(); + pstmt2.close(); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + stmt2.addBatch(""); + + return null; + } + }, SQLException.class, "Statement is closed."); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + stmt2.clearBatch(); + + return null; + } + }, SQLException.class, "Statement is closed."); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + stmt2.executeBatch(); + + return null; + } + }, SQLException.class, "Statement is closed."); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + pstmt2.addBatch(); + + return null; + } + }, SQLException.class, "Statement is closed."); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + pstmt2.clearBatch(); + + return null; + } + }, SQLException.class, "Statement is closed."); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + pstmt2.executeBatch(); + + return null; + } + }, SQLException.class, "Statement is closed."); + } + + /** + * @throws SQLException If failed. + */ + public void testBatchException() throws SQLException { + final int BATCH_SIZE = 7; + + for (int idx = 0, i = 0; i < BATCH_SIZE; ++i, idx += i) { + stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values " + + generateValues(idx, i + 1)); + } + + stmt.addBatch("select * from Person"); + + stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values " + + generateValues(100, 1)); + + try { + stmt.executeBatch(); + + fail("BatchUpdateException must be thrown"); + } catch(BatchUpdateException e) { + int [] updCnts = e.getUpdateCounts(); + + assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length); + + for (int i = 0; i < BATCH_SIZE; ++i) + assertEquals("Invalid update count",i + 1, updCnts[i]); + + if (!e.getMessage().contains("Query produced result set [qry=select * from Person, args=[]]")) { + log.error("Invalid exception: ", e); + + fail(); + } + } + } + + /** + * @throws SQLException If failed. + */ + public void testBatchClear() throws SQLException { + final int BATCH_SIZE = 7; + + for (int idx = 0, i = 0; i < BATCH_SIZE; ++i, idx += i) { + stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values " + + generateValues(idx, i + 1)); + } + + stmt.clearBatch(); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + stmt.executeBatch(); + + return null; + } + }, SQLException.class, "Batch is empty."); + } + + /** + * @throws SQLException If failed. + */ + public void testBatchPrepared() throws SQLException { + final int BATCH_SIZE = 10; + + for (int i = 0; i < BATCH_SIZE; ++i) { + int paramCnt = 1; + + pstmt.setString(paramCnt++, "p" + i); + pstmt.setInt(paramCnt++, i); + pstmt.setString(paramCnt++, "Name" + i); + pstmt.setString(paramCnt++, "Lastname" + i); + pstmt.setInt(paramCnt++, 20 + i); + + pstmt.addBatch(); + } + + int [] updCnts = pstmt.executeBatch(); + + assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length); + + for (int i = 0; i < BATCH_SIZE; ++i) + assertEquals("Invalid update count",1, updCnts[i]); + } + + /** + * @throws SQLException If failed. + */ + public void testBatchExceptionPrepared() throws SQLException { + final int BATCH_SIZE = 7; + + for (int i = 0; i < BATCH_SIZE; ++i) { + int paramCnt = 1; + + pstmt.setString(paramCnt++, "p" + i); + pstmt.setInt(paramCnt++, i); + pstmt.setString(paramCnt++, "Name" + i); + pstmt.setString(paramCnt++, "Lastname" + i); + pstmt.setInt(paramCnt++, 20 + i); + + pstmt.addBatch(); + } + + int paramCnt = 1; + pstmt.setString(paramCnt++, "p" + 100); + pstmt.setString(paramCnt++, "x"); + pstmt.setString(paramCnt++, "Name" + 100); + pstmt.setString(paramCnt++, "Lastname" + 100); + pstmt.setInt(paramCnt++, 20 + 100); + + pstmt.addBatch(); + + try { + pstmt.executeBatch(); + + fail("BatchUpdateException must be thrown"); + } catch(BatchUpdateException e) { + int [] updCnts = e.getUpdateCounts(); + + assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length); + + for (int i = 0; i < BATCH_SIZE; ++i) + assertEquals("Invalid update count",1, updCnts[i]); + + if (!e.getMessage().contains("Failed to execute SQL query.")) { + log.error("Invalid exception: ", e); + + fail(); + } + } + } + + /** + * @throws SQLException If failed. + */ + public void testBatchClearPrepared() throws SQLException { + final int BATCH_SIZE = 10; + + for (int i = 0; i < BATCH_SIZE; ++i) { + int paramCnt = 1; + + pstmt.setString(paramCnt++, "p" + i); + pstmt.setInt(paramCnt++, i); + pstmt.setString(paramCnt++, "Name" + i); + pstmt.setString(paramCnt++, "Lastname" + i); + pstmt.setInt(paramCnt++, 20 + i); + + pstmt.addBatch(); + } + + pstmt.clearBatch(); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + pstmt.executeBatch(); + + return null; + } + }, SQLException.class, "Batch is empty."); + } + + /** + * @param beginIndex Begin row index. + * @param cnt Count of rows. + * @return String contains values for 'cnt' rows. + */ + private String generateValues(int beginIndex, int cnt) { + StringBuilder sb = new StringBuilder(); + + int lastIdx = beginIndex + cnt - 1; + + for (int i = beginIndex; i < lastIdx; ++i) + sb.append(valuesRow(i)).append(','); + + sb.append(valuesRow(lastIdx)); + + return sb.toString(); + } + + /** + * @param idx Index of the row. + * @return String with row values. + */ + private String valuesRow(int idx) { + return String.format("('p%d', %d, 'Name%d', 'Lastname%d', %d)", idx, idx, idx, idx, 20 + idx); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1a7354fa/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java index 0c78a13..455c80f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java @@ -40,6 +40,7 @@ import java.sql.Timestamp; import java.util.ArrayList; import java.util.Calendar; import org.apache.ignite.internal.processors.odbc.SqlListenerUtils; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery; /** * JDBC prepared statement implementation. @@ -230,7 +231,20 @@ public class JdbcThinPreparedStatement extends JdbcThinStatement implements Prep @Override public void addBatch() throws SQLException { ensureNotClosed(); - throw new SQLFeatureNotSupportedException("Updates are not supported."); + if (batch == null) { + batch = new ArrayList<>(); + + batch.add(new JdbcQuery(sql, args.toArray(new Object[args.size()]))); + } + else + batch.add(new JdbcQuery(null, args.toArray(new Object[args.size()]))); + + args = null; + } + + /** {@inheritDoc} */ + @Override public void addBatch(String sql) throws SQLException { + throw new SQLException("The method 'addBatch(String)' is called on PreparedStatement instance."); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1a7354fa/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java index 2cad223..b01350a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java @@ -18,15 +18,20 @@ package org.apache.ignite.internal.jdbc.thin; import java.io.IOException; +import java.sql.BatchUpdateException; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLWarning; import java.sql.Statement; +import java.util.ArrayList; import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.internal.processors.odbc.SqlListenerResponse; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteResult; import static java.sql.ResultSet.CONCUR_READ_ONLY; @@ -62,6 +67,9 @@ public class JdbcThinStatement implements Statement { /** */ private boolean alreadyRead; + /** Batch. */ + protected List<JdbcQuery> batch; + /** * Creates new statement. * @@ -323,21 +331,53 @@ public class JdbcThinStatement implements Statement { @Override public void addBatch(String sql) throws SQLException { ensureNotClosed(); - throw new SQLFeatureNotSupportedException("Updates are not supported."); + if (batch == null) + batch = new ArrayList<>(); + + batch.add(new JdbcQuery(sql, null)); } /** {@inheritDoc} */ @Override public void clearBatch() throws SQLException { ensureNotClosed(); - throw new SQLFeatureNotSupportedException("Updates are not supported."); + batch = null; } /** {@inheritDoc} */ @Override public int[] executeBatch() throws SQLException { ensureNotClosed(); - throw new SQLFeatureNotSupportedException("Updates are not supported."); + if (rs != null) { + rs.close(); + + rs = null; + } + + alreadyRead = false; + + if (batch == null || batch.isEmpty()) + throw new SQLException("Batch is empty."); + + try { + JdbcBatchExecuteResult res = conn.io().batchExecute(conn.getSchema(), batch); + + if (res.errorCode() != SqlListenerResponse.STATUS_SUCCESS) + throw new BatchUpdateException(res.errorMessage(), null, res.errorCode(), res.updateCounts()); + + return res.updateCounts(); + } + catch (IOException e) { + conn.close(); + + throw new SQLException("Failed to query Ignite.", e); + } + catch (IgniteCheckedException e) { + throw new SQLException("Failed to query Ignite.", e); + } + finally { + batch = null; + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1a7354fa/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java index be62a8d..f54d5fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java @@ -32,6 +32,9 @@ import org.apache.ignite.internal.processors.odbc.SqlListenerNioListener; import org.apache.ignite.internal.processors.odbc.SqlListenerProtocolVersion; import org.apache.ignite.internal.processors.odbc.SqlListenerRequest; import org.apache.ignite.internal.processors.odbc.SqlListenerResponse; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryCloseRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteResult; @@ -58,6 +61,9 @@ public class JdbcThinTcpIo { /** Initial output for query message. */ private static final int QUERY_EXEC_MSG_INIT_CAP = 256; + /** Maximum batch query count. */ + private static final int MAX_BATCH_QRY_CNT = 32; + /** Initial output for query fetch message. */ private static final int QUERY_FETCH_MSG_SIZE = 13; @@ -289,6 +295,20 @@ public class JdbcThinTcpIo { } /** + * @param schema Schema. + * @param batch Batch queries. + * @return Result. + * @throws IOException On error. + * @throws IgniteCheckedException On error. + */ + public JdbcBatchExecuteResult batchExecute(String schema, List<JdbcQuery> batch) + throws IOException, IgniteCheckedException { + int cnt = Math.min(MAX_BATCH_QRY_CNT, batch.size()); + + return sendRequest(new JdbcBatchExecuteRequest(schema, batch), QUERY_EXEC_MSG_INIT_CAP * cnt); + } + + /** * @param req ODBC request. * @throws IOException On error. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1a7354fa/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java new file mode 100644 index 0000000..9f71bff --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * JDBC batch execute request. + */ +public class JdbcBatchExecuteRequest extends JdbcRequest { + /** Cache name. */ + private String schema; + + /** Sql query. */ + @GridToStringInclude(sensitive = true) + private List<JdbcQuery> queries; + + /** + * Default constructor. + */ + public JdbcBatchExecuteRequest() { + super(BATCH_EXEC); + } + + /** + * @param schema Schema. + * @param queries Queries. + */ + public JdbcBatchExecuteRequest(String schema, List<JdbcQuery> queries) { + super(BATCH_EXEC); + + assert !F.isEmpty(queries); + + this.schema = schema; + this.queries = queries; + } + + /** + * @return Schema. + */ + @Nullable public String schema() { + return schema; + } + + /** + * @return Queries. + */ + public List<JdbcQuery> queries() { + return queries; + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException { + super.writeBinary(writer); + + writer.writeString(schema); + writer.writeInt(queries.size()); + + for (JdbcQuery q : queries) + q.writeBinary(writer); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException { + super.readBinary(reader); + + schema = reader.readString(); + + int n = reader.readInt(); + + queries = new ArrayList<>(n); + + for (int i = 0; i < n; ++i) { + JdbcQuery qry = new JdbcQuery(); + + qry.readBinary(reader); + + queries.add(qry); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(JdbcBatchExecuteRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1a7354fa/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java new file mode 100644 index 0000000..7977c22 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteResult.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; + +/** + * JDBC batch execute result. + */ +public class JdbcBatchExecuteResult extends JdbcResult { + /** Update counts. */ + private int [] updateCnts; + + /** Batch update error code. */ + private int errCode; + + /** Batch update error message. */ + private String errMsg; + + /** + * Condtructor. + */ + public JdbcBatchExecuteResult() { + super(BATCH_EXEC); + } + + /** + * @param updateCnts Update counts for batch. + * @param errCode Error code. + * @param errMsg Error message. + */ + public JdbcBatchExecuteResult(int [] updateCnts, int errCode, String errMsg) { + super(BATCH_EXEC); + + this.updateCnts = updateCnts; + this.errCode = errCode; + this.errMsg = errMsg; + } + + /** + * @return Update count for DML queries. + */ + public int[] updateCounts() { + return updateCnts; + } + + /** + * @return Batch error code. + */ + public int errorCode() { + return errCode; + } + + /** + * @return Batch error message. + */ + public String errorMessage() { + return errMsg; + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException { + super.writeBinary(writer); + + writer.writeInt(errCode); + writer.writeString(errMsg); + writer.writeIntArray(updateCnts); + } + + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException { + super.readBinary(reader); + + errCode = reader.readInt(); + errMsg = reader.readString(); + updateCnts = reader.readIntArray(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1a7354fa/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQuery.java new file mode 100644 index 0000000..f7ffb99 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQuery.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.processors.odbc.SqlListenerUtils; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * JDBC SQL query with parameters. + */ +public class JdbcQuery implements JdbcRawBinarylizable { + /** Query SQL. */ + private String sql; + + /** Arguments. */ + private Object[] args; + + /** + * Default constructor is used for serialization. + */ + public JdbcQuery() { + // No-op. + } + + /** + * @param sql Query SQL. + * @param args Arguments. + */ + public JdbcQuery(String sql, Object[] args) { + this.sql = sql; + this.args = args; + } + + /** + * @return Query SQL string. + */ + public String sql() { + return sql; + } + + /** + * @return Query arguments. + */ + public Object[] args() { + return args; + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriterExImpl writer) { + writer.writeString(sql); + + if (args == null || args.length == 0) + writer.writeInt(0); + else { + writer.writeInt(args.length); + + for (Object arg : args) + SqlListenerUtils.writeObject(writer, arg, false); + } + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReaderExImpl reader) { + sql = reader.readString(); + + int argsNum = reader.readInt(); + + args = new Object[argsNum]; + + for (int i = 0; i < argsNum; ++i) + args[i] = SqlListenerUtils.readObject(reader, false); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(JdbcQuery.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1a7354fa/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java index d6f8fd3..0e144cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java @@ -39,6 +39,9 @@ public class JdbcRequest extends SqlListenerRequest implements JdbcRawBinaryliza /** Get columns meta query. */ public static final byte QRY_META = 5; + /** Batch queries. */ + public static final byte BATCH_EXEC = 6; + /** Request type. */ private byte type; @@ -97,6 +100,11 @@ public class JdbcRequest extends SqlListenerRequest implements JdbcRawBinaryliza break; + case BATCH_EXEC: + req = new JdbcBatchExecuteRequest(); + + break; + default: throw new IgniteException("Unknown SQL listener request ID: [request ID=" + reqType + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1a7354fa/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java index 94ac433..60c08f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java @@ -17,6 +17,11 @@ package org.apache.ignite.internal.processors.odbc.jdbc; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; @@ -31,10 +36,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - +import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_CLOSE; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_EXEC; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_FETCH; @@ -129,6 +131,9 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler { case QRY_META: return getQueryMeta((JdbcQueryMetadataRequest)req); + + case BATCH_EXEC: + return executeBatch((JdbcBatchExecuteRequest)req); } return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, "Unsupported JDBC request [req=" + req + ']'); @@ -307,4 +312,57 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler { return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); } } + + /** + * @param req Request. + * @return Response. + */ + private SqlListenerResponse executeBatch(JdbcBatchExecuteRequest req) { + String schemaName = req.schema(); + + if (F.isEmpty(schemaName)) + schemaName = QueryUtils.DFLT_SCHEMA; + + int successQueries = 0; + int updCnts[] = new int[req.queries().size()]; + + try { + String sql = null; + + for (JdbcQuery q : req.queries()) { + if (q.sql() != null) + sql = q.sql(); + + SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + qry.setArgs(q.args()); + + qry.setDistributedJoins(distributedJoins); + qry.setEnforceJoinOrder(enforceJoinOrder); + qry.setCollocated(collocated); + qry.setReplicatedOnly(replicatedOnly); + + qry.setSchema(schemaName); + + QueryCursorImpl<List<?>> qryCur = (QueryCursorImpl<List<?>>)ctx.query() + .querySqlFieldsNoCache(qry, true); + + if (qryCur.isQuery()) + throw new IgniteCheckedException("Query produced result set [qry=" + q.sql() + ", args=" + + Arrays.toString(q.args()) + ']'); + + List<List<?>> items = qryCur.getAll(); + + updCnts[successQueries++] = ((Long)items.get(0).get(0)).intValue(); + } + + return new JdbcResponse(new JdbcBatchExecuteResult(updCnts, SqlListenerResponse.STATUS_SUCCESS, null)); + } + catch (Exception e) { + U.error(log, "Failed to execute batch query [reqId=" + req.requestId() + ", req=" + req + ']', e); + + return new JdbcResponse(new JdbcBatchExecuteResult(Arrays.copyOf(updCnts, successQueries), + SqlListenerResponse.STATUS_FAILED, e.toString())); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1a7354fa/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java index 2d7666e..48affe9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java @@ -35,6 +35,9 @@ public class JdbcResult implements JdbcRawBinarylizable { /** Get columns meta query result. */ public static final byte QRY_META = 4; + /** Batch queries. */ + public static final byte BATCH_EXEC = 6; + /** Success status. */ private byte type; @@ -70,14 +73,22 @@ public class JdbcResult implements JdbcRawBinarylizable { switch(resId) { case QRY_EXEC: res = new JdbcQueryExecuteResult(); + break; case QRY_FETCH: res = new JdbcQueryFetchResult(); + break; case QRY_META: res = new JdbcQueryMetadataResult(); + + break; + + case BATCH_EXEC: + res = new JdbcBatchExecuteResult(); + break; default: