IGNITE-7443: Native batching for ODBC. This closes #3422.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4c4d70ef Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4c4d70ef Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4c4d70ef Branch: refs/heads/ignite-6643 Commit: 4c4d70ef4de3cd34d19ae2fcdf890612efd5c23b Parents: 992b025 Author: Igor Sapego <[email protected]> Authored: Wed Jan 24 15:58:34 2018 +0300 Committer: devozerov <[email protected]> Committed: Wed Jan 24 15:58:34 2018 +0300 ---------------------------------------------------------------------- .../odbc/odbc/OdbcRequestHandler.java | 77 +++++++++++--------- .../cpp/odbc-test/src/odbc_test_suite.cpp | 27 +++++-- .../cpp/odbc-test/src/sql_get_info_test.cpp | 2 +- .../include/ignite/odbc/app/parameter_set.h | 26 ++++++- .../cpp/odbc/include/ignite/odbc/message.h | 12 --- .../cpp/odbc/src/app/parameter_set.cpp | 21 +++++- .../cpp/odbc/src/config/connection_info.cpp | 2 +- modules/platforms/cpp/odbc/src/connection.cpp | 8 ++ modules/platforms/cpp/odbc/src/message.cpp | 4 +- .../cpp/odbc/src/query/batch_query.cpp | 13 +++- modules/platforms/cpp/odbc/src/statement.cpp | 18 +++++ 11 files changed, 149 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4c4d70ef/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java index 4ce99e3..3d5d7f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java @@ -17,11 +17,11 @@ package org.apache.ignite.internal.processors.odbc.odbc; +import java.sql.BatchUpdateException; import java.sql.ParameterMetaData; import java.sql.PreparedStatement; import java.sql.Types; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -30,22 +30,21 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.query.FieldsQueryCursor; -import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.binary.BinaryWriterExImpl; import org.apache.ignite.internal.binary.GridBinaryMarshaller; import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; -import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.odbc.ClientListenerRequest; import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler; import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; import org.apache.ignite.internal.processors.odbc.odbc.escape.OdbcEscapeUtils; -import org.apache.ignite.internal.processors.query.GridQueryIndexing; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; +import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.META_COLS; @@ -278,9 +277,6 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { * @return Response. */ private ClientListenerResponse executeBatchQuery(OdbcQueryExecuteBatchRequest req) { - List<Long> rowsAffected = new ArrayList<>(req.arguments().length); - int currentSet = 0; - try { String sql = OdbcEscapeUtils.parse(req.sqlQuery()); @@ -288,7 +284,7 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { log.debug("ODBC query parsed [reqId=" + req.requestId() + ", original=" + req.sqlQuery() + ", parsed=" + sql + ']'); - SqlFieldsQuery qry = makeQuery(req.schema(), sql, req.arguments(), req.timeout()); + SqlFieldsQueryEx qry = makeQuery(req.schema(), sql, null, req.timeout()); Object[][] paramSet = req.arguments(); @@ -297,19 +293,16 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { + paramSet.length + ']'); // Getting meta and do the checks for the first execution. - qry.setArgs(paramSet[0]); - - QueryCursorImpl<List<?>> qryCur = (QueryCursorImpl<List<?>>)ctx.query() - .querySqlFields(qry, true, true).get(0); + for (Object[] set : paramSet) + qry.addBatchedArgs(set); - if (qryCur.isQuery()) - throw new IgniteException("Batching of parameters only supported for DML statements. [query=" + - req.sqlQuery() + ']'); + List<FieldsQueryCursor<List<?>>> qryCurs = + ctx.query().querySqlFields(qry, true, true); - rowsAffected.add(OdbcUtils.rowsAffected(qryCur)); + List<Long> rowsAffected = new ArrayList<>(req.arguments().length); - for (currentSet = 1; currentSet < paramSet.length; ++currentSet) - rowsAffected.add(executeQuery(qry, paramSet[currentSet])); + for (FieldsQueryCursor<List<?>> qryCur : qryCurs) + rowsAffected.add(OdbcUtils.rowsAffected(qryCur)); OdbcQueryExecuteBatchResult res = new OdbcQueryExecuteBatchResult(rowsAffected); @@ -318,25 +311,11 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { catch (Exception e) { U.error(log, "Failed to execute SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e); - return exceptionToBatchResult(e, rowsAffected, currentSet); + return exceptionToBatchResult(e); } } /** - * Execute query. - * @param qry Query - * @param row Row - * @return Affected rows. - */ - private long executeQuery(SqlFieldsQuery qry, Object[] row) { - qry.setArgs(row); - - QueryCursor<List<?>> cur = ctx.query().querySqlFields(qry, true, true).get(0); - - return OdbcUtils.rowsAffected(cur); - } - - /** * {@link OdbcQueryCloseRequest} command handler. * * @param req Execute query request. @@ -658,9 +637,35 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { * @param e Exception to convert. * @return resulting {@link OdbcResponse}. */ - private OdbcResponse exceptionToBatchResult(Exception e, Collection<Long> rowsAffected, long currentSet) { - OdbcQueryExecuteBatchResult res = new OdbcQueryExecuteBatchResult(rowsAffected, currentSet, - OdbcUtils.tryRetrieveSqlErrorCode(e), OdbcUtils.tryRetrieveH2ErrorMessage(e)); + private OdbcResponse exceptionToBatchResult(Exception e) { + int code; + String msg; + List<Long> rowsAffected = new ArrayList<>(); + + if (e instanceof IgniteSQLException) { + BatchUpdateException batchCause = X.cause(e, BatchUpdateException.class); + + if (batchCause != null) { + for (long cnt : batchCause.getLargeUpdateCounts()) + rowsAffected.add(cnt); + + msg = batchCause.getMessage(); + + code = batchCause.getErrorCode(); + } + else { + msg = OdbcUtils.tryRetrieveH2ErrorMessage(e); + + code = ((IgniteSQLException)e).statusCode(); + } + } + else { + msg = e.getMessage(); + + code = IgniteQueryErrorCode.UNKNOWN; + } + + OdbcQueryExecuteBatchResult res = new OdbcQueryExecuteBatchResult(rowsAffected, -1, code, msg); return new OdbcResponse(res); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4c4d70ef/modules/platforms/cpp/odbc-test/src/odbc_test_suite.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/src/odbc_test_suite.cpp b/modules/platforms/cpp/odbc-test/src/odbc_test_suite.cpp index a967b4f..970c77d 100644 --- a/modules/platforms/cpp/odbc-test/src/odbc_test_suite.cpp +++ b/modules/platforms/cpp/odbc-test/src/odbc_test_suite.cpp @@ -463,21 +463,38 @@ namespace ignite { Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache"); + std::vector<SQLUSMALLINT> statuses(recordsNum, 42); + + // Binding statuses array. + SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_PARAM_STATUS_PTR, &statuses[0], 0); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + // Inserting values. - int inserted = InsertTestBatch(splitAt, recordsNum, recordsNum - splitAt); + int setsProcessed = InsertTestBatch(splitAt, recordsNum, recordsNum - splitAt); - BOOST_REQUIRE_EQUAL(inserted, recordsNum - splitAt); + BOOST_REQUIRE_EQUAL(setsProcessed, recordsNum - splitAt); - inserted = InsertTestBatch(0, recordsNum, splitAt); + for (int i = 0; i < recordsNum - splitAt; ++i) + BOOST_REQUIRE_EQUAL(statuses[i], SQL_PARAM_SUCCESS); - BOOST_REQUIRE_EQUAL(inserted, splitAt); + setsProcessed = InsertTestBatch(0, recordsNum, splitAt); + + BOOST_REQUIRE_EQUAL(setsProcessed, recordsNum); + + for (int i = 0; i < splitAt; ++i) + BOOST_REQUIRE_EQUAL(statuses[i], SQL_PARAM_SUCCESS); + + for (int i = splitAt; i < recordsNum; ++i) + BOOST_REQUIRE_EQUAL(statuses[i], SQL_PARAM_ERROR); int64_t key = 0; char strField[1024] = {0}; SQLLEN strFieldLen = 0; // Binding columns. - SQLRETURN ret = SQLBindCol(stmt, 1, SQL_C_SLONG, &key, 0, 0); + ret = SQLBindCol(stmt, 1, SQL_C_SLONG, &key, 0, 0); if (!SQL_SUCCEEDED(ret)) BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); http://git-wip-us.apache.org/repos/asf/ignite/blob/4c4d70ef/modules/platforms/cpp/odbc-test/src/sql_get_info_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/src/sql_get_info_test.cpp b/modules/platforms/cpp/odbc-test/src/sql_get_info_test.cpp index ad1236c..f303e2c 100644 --- a/modules/platforms/cpp/odbc-test/src/sql_get_info_test.cpp +++ b/modules/platforms/cpp/odbc-test/src/sql_get_info_test.cpp @@ -241,7 +241,7 @@ BOOST_AUTO_TEST_CASE(TestValues) CheckIntInfo(SQL_SQL92_VALUE_EXPRESSIONS, SQL_SVE_CASE | SQL_SVE_CAST | SQL_SVE_COALESCE | SQL_SVE_NULLIF); CheckIntInfo(SQL_STATIC_CURSOR_ATTRIBUTES1, SQL_CA1_NEXT); CheckIntInfo(SQL_STATIC_CURSOR_ATTRIBUTES2, 0); - CheckIntInfo(SQL_PARAM_ARRAY_ROW_COUNTS, SQL_PARC_NO_BATCH); + CheckIntInfo(SQL_PARAM_ARRAY_ROW_COUNTS, SQL_PARC_BATCH); CheckIntInfo(SQL_PARAM_ARRAY_SELECTS, SQL_PAS_NO_SELECT); CheckIntInfo(SQL_SCROLL_OPTIONS, SQL_SO_FORWARD_ONLY | SQL_SO_STATIC); CheckIntInfo(SQL_ALTER_DOMAIN, 0); http://git-wip-us.apache.org/repos/asf/ignite/blob/4c4d70ef/modules/platforms/cpp/odbc/include/ignite/odbc/app/parameter_set.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/app/parameter_set.h b/modules/platforms/cpp/odbc/include/ignite/odbc/app/parameter_set.h index 2ab5580..e30622d 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/app/parameter_set.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/app/parameter_set.h @@ -225,7 +225,28 @@ namespace ignite * * @return Pointer to write number of parameters processed in batch. */ - SqlUlen* GetParamsProcessedPtr(); + SqlUlen* GetParamsProcessedPtr() const; + + /** + * Set pointer to array in which to return the status of each + * set of parameters. + * @param value Value. + */ + void SetParamsStatusPtr(SQLUSMALLINT* value); + + /** + * Get pointer to array in which to return the status of each + * set of parameters. + * @return Value. + */ + SQLUSMALLINT* GetParamsStatusPtr() const; + + /** + * Set parameter status. + * @param idx Parameter index. + * @param status Status to set. + */ + void SetParamStatus(int64_t idx, SQLUSMALLINT status) const; private: /** @@ -249,6 +270,9 @@ namespace ignite /** Processed parameters. */ SqlUlen* processedParamRows; + /** Parameters status. */ + SQLUSMALLINT* paramsStatus; + /** Parameter set size. */ SqlUlen paramSetSize; http://git-wip-us.apache.org/repos/asf/ignite/blob/4c4d70ef/modules/platforms/cpp/odbc/include/ignite/odbc/message.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/message.h b/modules/platforms/cpp/odbc/include/ignite/odbc/message.h index 51c0b41..af03262 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/message.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/message.h @@ -677,15 +677,6 @@ namespace ignite } /** - * Get index of the set which caused an error. - * @return Index of the set which caused an error. - */ - int64_t GetErrorSetIdx() const - { - return static_cast<int64_t>(affectedRows.size()); - } - - /** * Get error message. * @return Error message. */ @@ -714,9 +705,6 @@ namespace ignite /** Affected rows. */ std::vector<int64_t> affectedRows; - /** Index of the set which caused an error. */ - int64_t errorSetIdx; - /** Error message. */ std::string errorMessage; http://git-wip-us.apache.org/repos/asf/ignite/blob/4c4d70ef/modules/platforms/cpp/odbc/src/app/parameter_set.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/app/parameter_set.cpp b/modules/platforms/cpp/odbc/src/app/parameter_set.cpp index c110d05..55dd332 100644 --- a/modules/platforms/cpp/odbc/src/app/parameter_set.cpp +++ b/modules/platforms/cpp/odbc/src/app/parameter_set.cpp @@ -27,6 +27,7 @@ namespace ignite parameters(), paramTypes(), paramBindOffset(0), + paramsStatus(0), processedParamRows(0), paramSetSize(1), paramSetPos(0), @@ -99,11 +100,29 @@ namespace ignite processedParamRows = ptr; } - SqlUlen* ParameterSet::GetParamsProcessedPtr() + SqlUlen* ParameterSet::GetParamsProcessedPtr() const { return processedParamRows; } + void ParameterSet::SetParamsStatusPtr(SQLUSMALLINT* value) + { + paramsStatus = value; + } + + SQLUSMALLINT* ParameterSet::GetParamsStatusPtr() const + { + return paramsStatus; + } + + void ParameterSet::SetParamStatus(int64_t idx, SQLUSMALLINT status) const + { + if (idx < 0 || !paramsStatus || idx >= static_cast<int64_t>(paramSetSize)) + return; + + paramsStatus[idx] = status; + } + void ParameterSet::SetParamsProcessed(SqlUlen processed) const { if (processedParamRows) http://git-wip-us.apache.org/repos/asf/ignite/blob/4c4d70ef/modules/platforms/cpp/odbc/src/config/connection_info.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/config/connection_info.cpp b/modules/platforms/cpp/odbc/src/config/connection_info.cpp index 0f8e50b..a70121d 100644 --- a/modules/platforms/cpp/odbc/src/config/connection_info.cpp +++ b/modules/platforms/cpp/odbc/src/config/connection_info.cpp @@ -1273,7 +1273,7 @@ namespace ignite // resulting from the execution of the statement for the entire array of parameters. This is // conceptually equivalent to treating the statement together with the complete parameter array as // one atomic unit. Errors are handled the same as if one statement were executed. - intParams[SQL_PARAM_ARRAY_ROW_COUNTS] = SQL_PARC_NO_BATCH; + intParams[SQL_PARAM_ARRAY_ROW_COUNTS] = SQL_PARC_BATCH; #endif // SQL_PARAM_ARRAY_ROW_COUNTS #ifdef SQL_PARAM_ARRAY_SELECTS http://git-wip-us.apache.org/repos/asf/ignite/blob/4c4d70ef/modules/platforms/cpp/odbc/src/connection.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/connection.cpp b/modules/platforms/cpp/odbc/src/connection.cpp index 8b03876..4e8f4ab 100644 --- a/modules/platforms/cpp/odbc/src/connection.cpp +++ b/modules/platforms/cpp/odbc/src/connection.cpp @@ -29,6 +29,9 @@ #include "ignite/odbc/message.h" #include "ignite/odbc/config/configuration.h" +// Uncomment for per-byte debug. +//#define PER_BYTE_DEBUG + namespace { #pragma pack(push, 1) @@ -39,6 +42,7 @@ namespace #pragma pack(pop) } + namespace ignite { namespace odbc @@ -218,7 +222,9 @@ namespace ignite if (res == OperationResult::FAIL) throw OdbcError(SqlState::S08S01_LINK_FAILURE, "Can not send message due to connection failure"); +#ifdef PER_BYTE_DEBUG LOG_MSG("message sent: (" << msg.GetSize() << " bytes)" << utility::HexDump(msg.GetData(), msg.GetSize())); +#endif //PER_BYTE_DEBUG return true; } @@ -285,7 +291,9 @@ namespace ignite if (res == OperationResult::FAIL) throw OdbcError(SqlState::S08S01_LINK_FAILURE, "Can not receive message body"); +#ifdef PER_BYTE_DEBUG LOG_MSG("Message received: " << utility::HexDump(&msg[0], msg.size())); +#endif //PER_BYTE_DEBUG return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4c4d70ef/modules/platforms/cpp/odbc/src/message.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/message.cpp b/modules/platforms/cpp/odbc/src/message.cpp index 57b7210..93fc6bb 100644 --- a/modules/platforms/cpp/odbc/src/message.cpp +++ b/modules/platforms/cpp/odbc/src/message.cpp @@ -350,7 +350,6 @@ namespace ignite QueryExecuteBatchResponse::QueryExecuteBatchResponse(): affectedRows(0), - errorSetIdx(-1), errorMessage(), errorCode(1) { @@ -370,7 +369,8 @@ namespace ignite if (!success) { - errorSetIdx = reader.ReadInt64(); + // Ignoring error set idx. To be deleted in next major version. + reader.ReadInt64(); errorMessage = reader.ReadObject<std::string>(); if (ver >= ProtocolVersion::VERSION_2_1_5) http://git-wip-us.apache.org/repos/asf/ignite/blob/4c4d70ef/modules/platforms/cpp/odbc/src/query/batch_query.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/query/batch_query.cpp b/modules/platforms/cpp/odbc/src/query/batch_query.cpp index a9db8d8..235daf8 100644 --- a/modules/platforms/cpp/odbc/src/query/batch_query.cpp +++ b/modules/platforms/cpp/odbc/src/query/batch_query.cpp @@ -72,7 +72,7 @@ namespace ignite res = MakeRequestExecuteBatch(processed, processed + currentPageSize, lastPage); processed += currentPageSize; - } while (res == SqlResult::AI_SUCCESS && processed < rowNum); + } while ((res == SqlResult::AI_SUCCESS || res == SqlResult::AI_SUCCESS_WITH_INFO) && processed < rowNum); params.SetParamsProcessed(static_cast<SqlUlen>(rowsAffected.size())); @@ -187,7 +187,16 @@ namespace ignite return SqlResult::AI_ERROR; } - rowsAffected.insert(rowsAffected.end(), rsp.GetAffectedRows().begin(), rsp.GetAffectedRows().end()); + const std::vector<int64_t>& rowsLastTime = rsp.GetAffectedRows(); + + for (size_t i = 0; i < rowsLastTime.size(); ++i) + { + int64_t idx = static_cast<int64_t>(i + rowsAffected.size()); + + params.SetParamStatus(idx, rowsLastTime[i] < 0 ? SQL_PARAM_ERROR : SQL_PARAM_SUCCESS); + } + + rowsAffected.insert(rowsAffected.end(), rowsLastTime.begin(), rowsLastTime.end()); LOG_MSG("Affected rows list size: " << rowsAffected.size()); if (!rsp.GetErrorMessage().empty()) http://git-wip-us.apache.org/repos/asf/ignite/blob/4c4d70ef/modules/platforms/cpp/odbc/src/statement.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/statement.cpp b/modules/platforms/cpp/odbc/src/statement.cpp index 898d44d..332487e 100644 --- a/modules/platforms/cpp/odbc/src/statement.cpp +++ b/modules/platforms/cpp/odbc/src/statement.cpp @@ -285,6 +285,12 @@ namespace ignite break; } + case SQL_ATTR_PARAM_STATUS_PTR: + { + parameters.SetParamsStatusPtr(reinterpret_cast<SQLUSMALLINT*>(value)); + break; + } + case SQL_ATTR_QUERY_TIMEOUT: { SqlUlen uTimeout = reinterpret_cast<SqlUlen>(value); @@ -435,6 +441,18 @@ namespace ignite break; } + case SQL_ATTR_PARAM_STATUS_PTR: + { + SQLUSMALLINT** val = reinterpret_cast<SQLUSMALLINT**>(buf); + + *val = parameters.GetParamsStatusPtr(); + + if (valueLen) + *valueLen = SQL_IS_POINTER; + + break; + } + case SQL_ATTR_QUERY_TIMEOUT: { SqlUlen *uTimeout = reinterpret_cast<SqlUlen*>(buf);
