IGNITE-4249: ODBC: Fixed performance issue caused by ineddicient IO handling on CPP side. This closes #1254.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b038730e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b038730e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b038730e Branch: refs/heads/ignite-4242 Commit: b038730ee56a662f73e02bbec83eb1712180fa82 Parents: 9d82f2c Author: isapego <[email protected]> Authored: Wed Nov 23 12:05:54 2016 +0300 Committer: devozerov <[email protected]> Committed: Wed Nov 23 12:05:54 2016 +0300 ---------------------------------------------------------------------- .../processors/odbc/OdbcRequestHandler.java | 32 ++++++++++++++------ .../src/impl/binary/binary_reader_impl.cpp | 3 +- modules/platforms/cpp/odbc/src/connection.cpp | 21 +++++++------ 3 files changed, 34 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b038730e/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java index 4a31be3..eef9945 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java @@ -192,9 +192,7 @@ public class OdbcRequestHandler { QueryCursor qryCur = cache.query(qry); - Iterator iter = qryCur.iterator(); - - qryCursors.put(qryId, new IgniteBiTuple<>(qryCur, iter)); + qryCursors.put(qryId, new IgniteBiTuple<QueryCursor, Iterator>(qryCur, null)); List<?> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta(); @@ -220,11 +218,15 @@ public class OdbcRequestHandler { */ private OdbcResponse closeQuery(long reqId, OdbcQueryCloseRequest req) { try { - QueryCursor cur = qryCursors.get(req.queryId()).get1(); + IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId()); - if (cur == null) + if (tuple == null) return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Failed to find query with ID: " + req.queryId()); + QueryCursor cur = tuple.get1(); + + assert(cur != null); + cur.close(); qryCursors.remove(req.queryId()); @@ -251,17 +253,27 @@ public class OdbcRequestHandler { */ private OdbcResponse fetchQuery(long reqId, OdbcQueryFetchRequest req) { try { - Iterator cur = qryCursors.get(req.queryId()).get2(); + IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId()); - if (cur == null) + if (tuple == null) return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Failed to find query with ID: " + req.queryId()); + Iterator iter = tuple.get2(); + + if (iter == null) { + QueryCursor cur = tuple.get1(); + + iter = cur.iterator(); + + tuple.put(cur, iter); + } + List<Object> items = new ArrayList<>(); - for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i) - items.add(cur.next()); + for (int i = 0; i < req.pageSize() && iter.hasNext(); ++i) + items.add(iter.next()); - OdbcQueryFetchResult res = new OdbcQueryFetchResult(req.queryId(), items, !cur.hasNext()); + OdbcQueryFetchResult res = new OdbcQueryFetchResult(req.queryId(), items, !iter.hasNext()); return new OdbcResponse(res); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b038730e/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp b/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp index c3f4fcc..fb75ba5 100644 --- a/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp +++ b/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp @@ -459,8 +459,7 @@ namespace ignite int32_t realLen = stream->ReadInt32(); if (res && len >= realLen) { - for (int i = 0; i < realLen; i++) - *(res + i) = static_cast<char>(stream->ReadInt8()); + stream->ReadInt8Array(reinterpret_cast<int8_t*>(res), realLen); if (len > realLen) *(res + realLen) = 0; // Set NULL terminator if possible. http://git-wip-us.apache.org/repos/asf/ignite/blob/b038730e/modules/platforms/cpp/odbc/src/connection.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/connection.cpp b/modules/platforms/cpp/odbc/src/connection.cpp index b8ed9fe..e8db376 100644 --- a/modules/platforms/cpp/odbc/src/connection.cpp +++ b/modules/platforms/cpp/odbc/src/connection.cpp @@ -19,6 +19,8 @@ #include <sstream> +#include <ignite/common/fixed_size_array.h> + #include "ignite/odbc/utility.h" #include "ignite/odbc/statement.h" #include "ignite/odbc/connection.h" @@ -178,26 +180,25 @@ namespace ignite if (!connected) IGNITE_ERROR_1(IgniteError::IGNITE_ERR_ILLEGAL_STATE, "Connection is not established"); - OdbcProtocolHeader hdr; + common::FixedSizeArray<int8_t> msg(len + sizeof(OdbcProtocolHeader)); - hdr.len = static_cast<int32_t>(len); + OdbcProtocolHeader *hdr = reinterpret_cast<OdbcProtocolHeader*>(msg.GetData()); - size_t sent = SendAll(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr)); + hdr->len = static_cast<int32_t>(len); - if (sent != sizeof(hdr)) - IGNITE_ERROR_1(IgniteError::IGNITE_ERR_GENERIC, "Can not send message header"); + memcpy(msg.GetData() + sizeof(OdbcProtocolHeader), data, len); - sent = SendAll(data, len); + size_t sent = SendAll(msg.GetData(), msg.GetSize()); - if (sent != len) - IGNITE_ERROR_1(IgniteError::IGNITE_ERR_GENERIC, "Can not send message body"); + if (sent != len + sizeof(OdbcProtocolHeader)) + IGNITE_ERROR_1(IgniteError::IGNITE_ERR_GENERIC, "Can not send message"); } size_t Connection::SendAll(const int8_t* data, size_t len) { int sent = 0; - while (sent != len) + while (sent != static_cast<int64_t>(len)) { int res = socket.Send(data + sent, len - sent); @@ -221,7 +222,7 @@ namespace ignite OdbcProtocolHeader hdr; - size_t received = ReceiveAll(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr)); + int64_t received = ReceiveAll(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr)); if (received != sizeof(hdr)) IGNITE_ERROR_1(IgniteError::IGNITE_ERR_GENERIC, "Can not receive message header");
