Repository: ignite Updated Branches: refs/heads/master 5fb04be39 -> db343b649
IGNITE-6876: Added in ODBC support for SQL_ATTR_CONNECTION_TIMEOUT Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/db343b64 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/db343b64 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/db343b64 Branch: refs/heads/master Commit: db343b649e4289ac28b769a741eee7ea77db8018 Parents: 5fb04be Author: Igor Sapego <isap...@gridgain.com> Authored: Mon Nov 20 20:15:37 2017 +0300 Committer: Igor Sapego <isap...@gridgain.com> Committed: Mon Nov 20 20:15:37 2017 +0300 ---------------------------------------------------------------------- .../cpp/odbc-test/src/attributes_test.cpp | 22 +++ .../cpp/odbc-test/src/queries_test.cpp | 113 +++++++++++++++ .../cpp/odbc/include/ignite/odbc/common_types.h | 3 + .../cpp/odbc/include/ignite/odbc/connection.h | 81 +++++++++-- .../include/ignite/odbc/system/socket_client.h | 55 +++++++- .../odbc/os/linux/src/system/socket_client.cpp | 141 +++++++++++++++++-- .../odbc/os/win/src/system/socket_client.cpp | 134 ++++++++++++++++-- modules/platforms/cpp/odbc/src/connection.cpp | 141 ++++++++++++++----- .../odbc/src/diagnostic/diagnostic_record.cpp | 6 + .../cpp/odbc/src/query/batch_query.cpp | 12 +- .../platforms/cpp/odbc/src/query/data_query.cpp | 12 +- 11 files changed, 644 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc-test/src/attributes_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/src/attributes_test.cpp b/modules/platforms/cpp/odbc-test/src/attributes_test.cpp index b87f4b9..c4c2433 100644 --- a/modules/platforms/cpp/odbc-test/src/attributes_test.cpp +++ b/modules/platforms/cpp/odbc-test/src/attributes_test.cpp @@ -227,4 +227,26 @@ BOOST_AUTO_TEST_CASE(StatementAttributeQueryTimeout) BOOST_REQUIRE_EQUAL(timeout, 7); } +BOOST_AUTO_TEST_CASE(ConnectionAttributeConnectionTimeout) +{ + Connect("DRIVER={Apache Ignite};address=127.0.0.1:11110;schema=cache"); + + SQLUINTEGER timeout = -1; + SQLRETURN ret = SQLGetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, &timeout, 0, 0); + + ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc); + BOOST_REQUIRE_EQUAL(timeout, 0); + + ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, reinterpret_cast<SQLPOINTER>(42), 0); + + ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc); + + timeout = -1; + + ret = SQLGetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, &timeout, 0, 0); + + ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc); + BOOST_REQUIRE_EQUAL(timeout, 42); +} + BOOST_AUTO_TEST_SUITE_END() http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc-test/src/queries_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/src/queries_test.cpp b/modules/platforms/cpp/odbc-test/src/queries_test.cpp index 6fcf7c9..dafab1a 100644 --- a/modules/platforms/cpp/odbc-test/src/queries_test.cpp +++ b/modules/platforms/cpp/odbc-test/src/queries_test.cpp @@ -2404,5 +2404,118 @@ BOOST_AUTO_TEST_CASE(TestCloseAfterEmptyUpdate) BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); } +BOOST_AUTO_TEST_CASE(TestConnectionTimeoutQuery) +{ + Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache"); + + SQLRETURN ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0); + + ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc); + + InsertTestStrings(10, false); +} + +BOOST_AUTO_TEST_CASE(TestConnectionTimeoutBatch) +{ + Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache"); + + SQLRETURN ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0); + + ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc); + + InsertTestBatch(11, 20, 9); +} + +BOOST_AUTO_TEST_CASE(TestConnectionTimeoutBoth) +{ + Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache"); + + SQLRETURN ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0); + + ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc); + + InsertTestStrings(10, false); + InsertTestBatch(11, 20, 9); +} + +BOOST_AUTO_TEST_CASE(TestQueryTimeoutQuery) +{ + Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache"); + + SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0); + + ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt); + + InsertTestStrings(10, false); +} + +BOOST_AUTO_TEST_CASE(TestQueryTimeoutBatch) +{ + Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache"); + + SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0); + + ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt); + + InsertTestBatch(11, 20, 9); +} + +BOOST_AUTO_TEST_CASE(TestQueryTimeoutBoth) +{ + Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache"); + + SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0); + + ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt); + + InsertTestStrings(10, false); + InsertTestBatch(11, 20, 9); +} + +BOOST_AUTO_TEST_CASE(TestQueryAndConnectionTimeoutQuery) +{ + Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache"); + + SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0); + + ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt); + + ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, reinterpret_cast<SQLPOINTER>(3), 0); + + ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc); + + InsertTestStrings(10, false); +} + +BOOST_AUTO_TEST_CASE(TestQueryAndConnectionTimeoutBatch) +{ + Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache"); + + SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0); + + ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt); + + ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, reinterpret_cast<SQLPOINTER>(3), 0); + + ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc); + + InsertTestBatch(11, 20, 9); +} + +BOOST_AUTO_TEST_CASE(TestQueryAndConnectionTimeoutBoth) +{ + Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache"); + + SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, reinterpret_cast<SQLPOINTER>(5), 0); + + ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt); + + ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, reinterpret_cast<SQLPOINTER>(3), 0); + + ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc); + + InsertTestStrings(10, false); + InsertTestBatch(11, 20, 9); +} BOOST_AUTO_TEST_SUITE_END() http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/include/ignite/odbc/common_types.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/common_types.h b/modules/platforms/cpp/odbc/include/ignite/odbc/common_types.h index 349147f..9c8c433 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/common_types.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/common_types.h @@ -190,6 +190,9 @@ namespace ignite */ SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED, + /** The timeout period expired before the data source responded to the request. */ + SHYT00_TIMEOUT_EXPIRED, + /** * The connection timeout period expired before the data source * responded to the request. http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h b/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h index 34fed5f..1577ee7 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h @@ -27,6 +27,7 @@ #include "ignite/odbc/config/connection_info.h" #include "ignite/odbc/config/configuration.h" #include "ignite/odbc/diagnostic/diagnosable_adapter.h" +#include "ignite/odbc/odbc_error.h" namespace ignite { @@ -42,6 +43,19 @@ namespace ignite friend class Environment; public: /** + * Operation with timeout result. + */ + struct OperationResult + { + enum T + { + SUCCESS, + FAIL, + TIMEOUT + }; + }; + + /** * Destructor. */ ~Connection(); @@ -96,15 +110,21 @@ namespace ignite * * @param data Data buffer. * @param len Data length. + * @param timeout Timeout. + * @return @c true on success, @c false on timeout. + * @throw OdbcError on error. */ - void Send(const int8_t* data, size_t len); + bool Send(const int8_t* data, size_t len, int32_t timeout); /** * Receive next message. * * @param msg Buffer for message. + * @param timeout Timeout. + * @return @c true on success, @c false on timeout. + * @throw OdbcError on error. */ - void Receive(std::vector<int8_t>& msg); + bool Receive(std::vector<int8_t>& msg, int32_t timeout); /** * Get name of the assotiated schema. @@ -134,9 +154,43 @@ namespace ignite /** * Synchronously send request message and receive response. + * Uses provided timeout. * * @param req Request message. * @param rsp Response message. + * @param timeout Timeout. + * @return @c true on success, @c false on timeout. + * @throw OdbcError on error. + */ + template<typename ReqT, typename RspT> + bool SyncMessage(const ReqT& req, RspT& rsp, int32_t timeout) + { + std::vector<int8_t> tempBuffer; + + parser.Encode(req, tempBuffer); + + bool success = Send(tempBuffer.data(), tempBuffer.size(), timeout); + + if (!success) + return false; + + success = Receive(tempBuffer, timeout); + + if (!success) + return false; + + parser.Decode(rsp, tempBuffer); + + return true; + } + + /** + * Synchronously send request message and receive response. + * Uses connection timeout. + * + * @param req Request message. + * @param rsp Response message. + * @throw OdbcError on error. */ template<typename ReqT, typename RspT> void SyncMessage(const ReqT& req, RspT& rsp) @@ -145,9 +199,15 @@ namespace ignite parser.Encode(req, tempBuffer); - Send(tempBuffer.data(), tempBuffer.size()); + bool success = Send(tempBuffer.data(), tempBuffer.size(), timeout); + + if (!success) + throw OdbcError(SqlState::SHYT01_CONNECTION_TIMEOUT, "Send operation timed out"); + + success = Receive(tempBuffer, timeout); - Receive(tempBuffer); + if (!success) + throw OdbcError(SqlState::SHYT01_CONNECTION_TIMEOUT, "Receive operation timed out"); parser.Decode(rsp, tempBuffer); } @@ -280,18 +340,20 @@ namespace ignite * * @param dst Buffer for data. * @param len Number of bytes to receive. - * @return Number of successfully received bytes. + * @param timeout Timeout. + * @return Operation result. */ - size_t ReceiveAll(void* dst, size_t len); + OperationResult::T ReceiveAll(void* dst, size_t len, int32_t timeout); /** * Send specified number of bytes. * * @param data Data buffer. * @param len Data length. - * @return Number of successfully sent bytes. + * @param timeout Timeout. + * @return Operation result. */ - size_t SendAll(const int8_t* data, size_t len); + OperationResult::T SendAll(const int8_t* data, size_t len, int32_t timeout); /** * Perform handshake request. @@ -311,6 +373,9 @@ namespace ignite /** State flag. */ bool connected; + /** Connection timeout in seconds. */ + int32_t timeout; + /** Message parser. */ Parser parser; http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h b/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h index 946605e..2a3cfa3 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h @@ -44,6 +44,22 @@ namespace ignite /** The time in seconds between individual keepalive probes. */ enum { KEEP_ALIVE_PROBES_PERIOD = 1 }; + /** Connection establishment timeout in seconds. */ + enum { CONNECT_TIMEOUT = 5 }; + + /** + * Non-negative timeout operation result. + */ + struct WaitResult + { + enum T + { + TIMEOUT = 0, + + SUCCESS = 1 + }; + }; + /** * Constructor. */ @@ -76,20 +92,31 @@ namespace ignite * * @param data Pointer to data to be sent. * @param size Size of the data in bytes. - * @return Number of bytes that have been sent on success and negative - * value on failure. + * @param timeout Timeout. + * @return Number of bytes that have been sent on success, + * WaitResult::TIMEOUT on timeout and -errno on failure. */ - int Send(const int8_t* data, size_t size); + int Send(const int8_t* data, size_t size, int32_t timeout); /** * Receive data from established connection. * * @param buffer Pointer to data buffer. * @param size Size of the buffer in bytes. - * @return Number of bytes that have been received on success and negative - * value on failure. + * @param timeout Timeout. + * @return Number of bytes that have been sent on success, + * WaitResult::TIMEOUT on timeout and -errno on failure. */ - int Receive(int8_t* buffer, size_t size); + int Receive(int8_t* buffer, size_t size, int32_t timeout); + + /** + * Check if the socket is blocking or not. + * @return @c true if the socket is blocking and false otherwise. + */ + bool IsBlocking() const + { + return blocking; + } private: /** @@ -97,8 +124,24 @@ namespace ignite */ void TrySetOptions(diagnostic::Diagnosable& diag); + /** + * Wait on the socket for any event for specified time. + * This function uses poll to achive timeout functionality + * for every separate socket operation. + * + * @param timeout Timeout. + * @param rd Wait for read if @c true, or for write if @c false. + * @return -errno on error, WaitResult::TIMEOUT on timeout and + * WaitResult::SUCCESS on success. + */ + int WaitOnSocket(int32_t timeout, bool rd); + + /** Handle. */ intptr_t socketHandle; + /** Blocking flag. */ + bool blocking; + IGNITE_NO_COPY_ASSIGNMENT(SocketClient) }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp b/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp index 5a9b03a..a6d6151 100644 --- a/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp +++ b/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp @@ -20,6 +20,7 @@ #include <netinet/tcp.h> #include <netdb.h> #include <unistd.h> +#include <fcntl.h> #include <cstring> @@ -35,26 +36,36 @@ namespace { /** * Get last socket error message. + * @param error Error code. * @return Last socket error message string. */ - std::string GetLastSocketErrorMessage() + std::string GetSocketErrorMessage(int error) { - int lastError = errno; std::stringstream res; - res << "error_code=" << lastError; + res << "error_code=" << error; - if (lastError == 0) + if (error == 0) return res.str(); char buffer[1024] = ""; - strerror_r(lastError, buffer, sizeof(buffer)); - - res << ", msg=" << buffer; + if (!strerror_r(error, buffer, sizeof(buffer))) + res << ", msg=" << buffer; return res.str(); } + + /** + * Get last socket error message. + * @return Last socket error message string. + */ + std::string GetLastSocketErrorMessage() + { + int lastError = errno; + + return GetSocketErrorMessage(lastError); + } } namespace ignite @@ -64,7 +75,9 @@ namespace ignite namespace tcp { - SocketClient::SocketClient() : socketHandle(SOCKET_ERROR) + SocketClient::SocketClient() : + socketHandle(SOCKET_ERROR), + blocking(true) { // No-op. } @@ -129,11 +142,27 @@ namespace ignite res = connect(socketHandle, it->ai_addr, static_cast<int>(it->ai_addrlen)); if (SOCKET_ERROR == res) { - LOG_MSG("Connection failed: " << GetLastSocketErrorMessage()); + int lastError = errno; + + if (lastError != EWOULDBLOCK && lastError != EINPROGRESS) + { + LOG_MSG("Connection failed: " << GetSocketErrorMessage(lastError)); + + Close(); - Close(); + continue; + } - continue; + res = WaitOnSocket(CONNECT_TIMEOUT, false); + + if (res < 0 || res == WaitResult::TIMEOUT) + { + LOG_MSG("Connection timeout expired: " << GetSocketErrorMessage(-res)); + + Close(); + + continue; + } } break; } @@ -153,13 +182,29 @@ namespace ignite } } - int SocketClient::Send(const int8_t* data, size_t size) + int SocketClient::Send(const int8_t* data, size_t size, int32_t timeout) { + if (!blocking) + { + int res = WaitOnSocket(timeout, false); + + if (res < 0 || res == WaitResult::TIMEOUT) + return res; + } + return send(socketHandle, reinterpret_cast<const char*>(data), static_cast<int>(size), 0); } - int SocketClient::Receive(int8_t* buffer, size_t size) + int SocketClient::Receive(int8_t* buffer, size_t size, int32_t timeout) { + if (!blocking) + { + int res = WaitOnSocket(timeout, true); + + if (res < 0 || res == WaitResult::TIMEOUT) + return res; + } + return recv(socketHandle, reinterpret_cast<char*>(buffer), static_cast<int>(size), 0); } @@ -203,6 +248,30 @@ namespace ignite "Can not set up TCP no-delay mode"); } + res = setsockopt(socketHandle, SOL_SOCKET, SO_OOBINLINE, + reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt)); + + if (SOCKET_ERROR == res) + { + LOG_MSG("TCP out-of-bound data inlining setup failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, + "Can not set up TCP out-of-bound data inlining"); + } + + blocking = false; + + int flags; + if (((flags = fcntl(socketHandle, F_GETFL, 0)) < 0) || + (fcntl(socketHandle, F_SETFL, flags | O_NONBLOCK) < 0)) + { + blocking = true; + LOG_MSG("Non-blocking mode setup failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, + "Can not set up non-blocking mode. Timeouts are not available."); + } + res = setsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt)); @@ -238,6 +307,52 @@ namespace ignite diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, "Can not set up TCP keep-alive probes period"); } + + } + + int SocketClient::WaitOnSocket(int32_t timeout, bool rd) + { + int ready = 0; + int lastError = 0; + + fd_set fds; + + do { + struct timeval tv = { 0 }; + tv.tv_sec = timeout; + + FD_ZERO(&fds); + FD_SET(socketHandle, &fds); + + fd_set* readFds = 0; + fd_set* writeFds = 0; + + if (rd) + readFds = &fds; + else + writeFds = &fds; + + ready = select(static_cast<int>((socketHandle) + 1), + readFds, writeFds, NULL, (timeout == 0 ? NULL : &tv)); + + if (ready == SOCKET_ERROR) + lastError = errno; + + } while (ready == SOCKET_ERROR && lastError == EINTR); + + if (ready == SOCKET_ERROR) + return -lastError; + + socklen_t size = sizeof(lastError); + int res = getsockopt(socketHandle, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&lastError), &size); + + if (res != SOCKET_ERROR && lastError != 0) + return -lastError; + + if (ready == 0) + return WaitResult::TIMEOUT; + + return WaitResult::SUCCESS; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp b/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp index 30fb7d7..6f87b93 100644 --- a/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp +++ b/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp @@ -34,17 +34,17 @@ namespace { /** - * Get last socket error message. - * @return Last socket error message string. + * Get socket error message for the error code. + * @param error Error code. + * @return Socket error message string. */ - std::string GetLastSocketErrorMessage() + std::string GetSocketErrorMessage(HRESULT error) { - HRESULT lastError = WSAGetLastError(); std::stringstream res; - res << "error_code=" << lastError; + res << "error_code=" << error; - if (lastError == 0) + if (error == 0) return res.str(); LPTSTR errorText = NULL; @@ -58,7 +58,7 @@ namespace | FORMAT_MESSAGE_IGNORE_INSERTS, // unused with FORMAT_MESSAGE_FROM_SYSTEM NULL, - lastError, + error, MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US), // output reinterpret_cast<LPTSTR>(&errorText), @@ -77,6 +77,17 @@ namespace return res.str(); } + + /** + * Get last socket error message. + * @return Last socket error message string. + */ + std::string GetLastSocketErrorMessage() + { + HRESULT lastError = WSAGetLastError(); + + return GetSocketErrorMessage(lastError); + } } namespace ignite @@ -86,7 +97,9 @@ namespace ignite namespace tcp { - SocketClient::SocketClient() : socketHandle(INVALID_SOCKET) + SocketClient::SocketClient() : + socketHandle(INVALID_SOCKET), + blocking(true) { // No-op. } @@ -170,11 +183,27 @@ namespace ignite res = connect(socketHandle, it->ai_addr, static_cast<int>(it->ai_addrlen)); if (SOCKET_ERROR == res) { - LOG_MSG("Connection failed: " << GetLastSocketErrorMessage()); + int lastError = WSAGetLastError(); - Close(); + if (lastError != WSAEWOULDBLOCK) + { + LOG_MSG("Connection failed: " << GetSocketErrorMessage(lastError)); - continue; + Close(); + + continue; + } + + res = WaitOnSocket(CONNECT_TIMEOUT, false); + + if (res < 0 || res == WaitResult::TIMEOUT) + { + LOG_MSG("Connection timeout expired: " << GetSocketErrorMessage(-res)); + + Close(); + + continue; + } } break; } @@ -194,21 +223,39 @@ namespace ignite } } - int SocketClient::Send(const int8_t* data, size_t size) + int SocketClient::Send(const int8_t* data, size_t size, int32_t timeout) { + if (!blocking) + { + int res = WaitOnSocket(timeout, false); + + if (res < 0 || res == WaitResult::TIMEOUT) + return res; + } + return send(socketHandle, reinterpret_cast<const char*>(data), static_cast<int>(size), 0); } - int SocketClient::Receive(int8_t* buffer, size_t size) + int SocketClient::Receive(int8_t* buffer, size_t size, int32_t timeout) { + if (!blocking) + { + int res = WaitOnSocket(timeout, true); + + if (res < 0 || res == WaitResult::TIMEOUT) + return res; + } + return recv(socketHandle, reinterpret_cast<char*>(buffer), static_cast<int>(size), 0); } void SocketClient::TrySetOptions(diagnostic::Diagnosable& diag) { BOOL trueOpt = TRUE; + ULONG uTrueOpt = TRUE; int bufSizeOpt = BUFFER_SIZE; + int res = setsockopt(socketHandle, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&bufSizeOpt), sizeof(bufSizeOpt)); @@ -242,6 +289,29 @@ namespace ignite "Can not set up TCP no-delay mode"); } + res = setsockopt(socketHandle, SOL_SOCKET, SO_OOBINLINE, + reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt)); + + if (SOCKET_ERROR == res) + { + LOG_MSG("TCP out-of-bound data inlining setup failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, + "Can not set up TCP out-of-bound data inlining"); + } + + blocking = false; + res = ioctlsocket(socketHandle, FIONBIO, &uTrueOpt); + + if (res == SOCKET_ERROR) + { + blocking = true; + LOG_MSG("Non-blocking mode setup failed: " << GetLastSocketErrorMessage()); + + diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, + "Can not set up non-blocking mode. Timeouts are not available."); + } + res = setsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt)); @@ -318,6 +388,44 @@ namespace ignite #endif } + int SocketClient::WaitOnSocket(int32_t timeout, bool rd) + { + int ready = 0; + int lastError = 0; + + fd_set fds; + + do { + struct timeval tv = { 0 }; + tv.tv_sec = timeout; + + FD_ZERO(&fds); + FD_SET(socketHandle, &fds); + + fd_set* readFds = 0; + fd_set* writeFds = 0; + + if (rd) + readFds = &fds; + else + writeFds = &fds; + + ready = select(static_cast<int>((socketHandle) + 1), + readFds, writeFds, NULL, (timeout == 0 ? NULL : &tv)); + + if (ready == SOCKET_ERROR) + lastError = WSAGetLastError(); + + } while (ready == SOCKET_ERROR && lastError == WSAEINTR); + + if (ready == SOCKET_ERROR) + return -lastError; + + if (ready == 0) + return WaitResult::TIMEOUT; + + return WaitResult::SUCCESS; + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/src/connection.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/connection.cpp b/modules/platforms/cpp/odbc/src/connection.cpp index b99d768..8b03876 100644 --- a/modules/platforms/cpp/odbc/src/connection.cpp +++ b/modules/platforms/cpp/odbc/src/connection.cpp @@ -16,6 +16,7 @@ */ #include <cstring> +#include <cstddef> #include <sstream> @@ -27,7 +28,6 @@ #include "ignite/odbc/connection.h" #include "ignite/odbc/message.h" #include "ignite/odbc/config/configuration.h" -#include "ignite/odbc/odbc_error.h" namespace { @@ -46,6 +46,7 @@ namespace ignite Connection::Connection() : socket(), connected(false), + timeout(0), parser(), config(), info(config) @@ -194,7 +195,7 @@ namespace ignite return SqlResult::AI_SUCCESS; } - void Connection::Send(const int8_t* data, size_t len) + bool Connection::Send(const int8_t* data, size_t len, int32_t timeout) { if (!connected) throw OdbcError(SqlState::S08003_NOT_CONNECTED, "Connection is not established"); @@ -209,38 +210,45 @@ namespace ignite memcpy(msg.GetData() + sizeof(OdbcProtocolHeader), data, len); - size_t sent = SendAll(msg.GetData(), msg.GetSize()); + OperationResult::T res = SendAll(msg.GetData(), msg.GetSize(), timeout); - if (sent != len + sizeof(OdbcProtocolHeader)) + if (res == OperationResult::TIMEOUT) + return false; + + if (res == OperationResult::FAIL) throw OdbcError(SqlState::S08S01_LINK_FAILURE, "Can not send message due to connection failure"); LOG_MSG("message sent: (" << msg.GetSize() << " bytes)" << utility::HexDump(msg.GetData(), msg.GetSize())); + + return true; } - size_t Connection::SendAll(const int8_t* data, size_t len) + Connection::OperationResult::T Connection::SendAll(const int8_t* data, size_t len, int32_t timeout) { int sent = 0; while (sent != static_cast<int64_t>(len)) { - int res = socket.Send(data + sent, len - sent); + int res = socket.Send(data + sent, len - sent, timeout); LOG_MSG("Sent: " << res); - if (res <= 0) + if (res < 0 || res == tcp::SocketClient::WaitResult::TIMEOUT) { Close(); - return sent; + return res < 0 ? OperationResult::FAIL : OperationResult::TIMEOUT; } sent += res; } - return sent; + assert(static_cast<size_t>(sent) == len); + + return OperationResult::SUCCESS; } - void Connection::Receive(std::vector<int8_t>& msg) + bool Connection::Receive(std::vector<int8_t>& msg, int32_t timeout) { if (!connected) throw OdbcError(SqlState::S08003_NOT_CONNECTED, "Connection is not established"); @@ -249,36 +257,40 @@ namespace ignite OdbcProtocolHeader hdr; - size_t received = ReceiveAll(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr)); + OperationResult::T res = ReceiveAll(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr), timeout); + + if (res == OperationResult::TIMEOUT) + return false; - if (received != sizeof(hdr)) + if (res == OperationResult::FAIL) throw OdbcError(SqlState::S08S01_LINK_FAILURE, "Can not receive message header"); if (hdr.len < 0) { Close(); - throw OdbcError(SqlState::S08S01_LINK_FAILURE, "Protocol error: Message length is negative"); + throw OdbcError(SqlState::SHY000_GENERAL_ERROR, "Protocol error: Message length is negative"); } if (hdr.len == 0) - return; + return false; msg.resize(hdr.len); - received = ReceiveAll(&msg[0], hdr.len); + res = ReceiveAll(&msg[0], hdr.len, timeout); - if (received != hdr.len) - { - msg.resize(received); + if (res == OperationResult::TIMEOUT) + return false; + if (res == OperationResult::FAIL) throw OdbcError(SqlState::S08S01_LINK_FAILURE, "Can not receive message body"); - } LOG_MSG("Message received: " << utility::HexDump(&msg[0], msg.size())); + + return true; } - size_t Connection::ReceiveAll(void* dst, size_t len) + Connection::OperationResult::T Connection::ReceiveAll(void* dst, size_t len, int32_t timeout) { size_t remain = len; int8_t* buffer = reinterpret_cast<int8_t*>(dst); @@ -287,20 +299,20 @@ namespace ignite { size_t received = len - remain; - int res = socket.Receive(buffer + received, remain); + int res = socket.Receive(buffer + received, remain, timeout); LOG_MSG("Receive res: " << res << " remain: " << remain); - if (res <= 0) + if (res < 0 || res == tcp::SocketClient::WaitResult::TIMEOUT) { Close(); - return received; + return res < 0 ? OperationResult::FAIL : OperationResult::TIMEOUT; } remain -= static_cast<size_t>(res); } - return len; + return OperationResult::SUCCESS; } const std::string& Connection::GetSchema() const @@ -334,6 +346,14 @@ namespace ignite IGNITE_ODBC_API_CALL(InternalTransactionRollback()); } + SqlResult::Type Connection::InternalTransactionRollback() + { + AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED, + "Rollback operation is not supported."); + + return SqlResult::AI_ERROR; + } + void Connection::GetAttribute(int attr, void* buf, SQLINTEGER bufLen, SQLINTEGER* valueLen) { IGNITE_ODBC_API_CALL(InternalGetAttribute(attr, buf, bufLen, valueLen)); @@ -343,7 +363,7 @@ namespace ignite { if (!buf) { - AddStatusRecord(SqlState::SHY000_GENERAL_ERROR, "Data buffer is NULL."); + AddStatusRecord(SqlState::SHY009_INVALID_USE_OF_NULL_POINTER, "Data buffer is null."); return SqlResult::AI_ERROR; } @@ -362,6 +382,18 @@ namespace ignite break; } + case SQL_ATTR_CONNECTION_TIMEOUT: + { + SQLUINTEGER *val = reinterpret_cast<SQLUINTEGER*>(buf); + + *val = static_cast<SQLUINTEGER>(timeout); + + if (valueLen) + *valueLen = SQL_IS_INTEGER; + + break; + } + default: { AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED, @@ -381,6 +413,13 @@ namespace ignite SqlResult::Type Connection::InternalSetAttribute(int attr, void* value, SQLINTEGER valueLen) { + if (!value) + { + AddStatusRecord(SqlState::SHY009_INVALID_USE_OF_NULL_POINTER, "Value pointer is null."); + + return SqlResult::AI_ERROR; + } + switch (attr) { case SQL_ATTR_CONNECTION_DEAD: @@ -390,6 +429,39 @@ namespace ignite return SqlResult::AI_ERROR; } + case SQL_ATTR_CONNECTION_TIMEOUT: + { + SQLUINTEGER uTimeout = static_cast<SQLUINTEGER>(reinterpret_cast<ptrdiff_t>(value)); + + if (uTimeout != 0 && connected && socket.IsBlocking()) + { + timeout = 0; + + AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, "Can not set timeout, because can not " + "enable non-blocking mode on TCP connection. Setting to 0."); + + return SqlResult::AI_SUCCESS_WITH_INFO; + } + + if (uTimeout > INT32_MAX) + { + timeout = INT32_MAX; + + std::stringstream ss; + + ss << "Value is too big: " << uTimeout << ", changing to " << timeout << "."; + std::string msg = ss.str(); + + AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, msg); + + return SqlResult::AI_SUCCESS_WITH_INFO; + } + + timeout = static_cast<int32_t>(uTimeout); + + break; + } + default: { AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED, @@ -402,14 +474,6 @@ namespace ignite return SqlResult::AI_SUCCESS; } - SqlResult::Type Connection::InternalTransactionRollback() - { - AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED, - "Rollback operation is not supported."); - - return SqlResult::AI_ERROR; - } - SqlResult::Type Connection::MakeRequestHandshake() { bool distributedJoins = false; @@ -451,7 +515,16 @@ namespace ignite try { - SyncMessage(req, rsp); + // Workaround for some Linux systems that report connection on non-blocking + // sockets as successfull but fail to establish real connection. + bool sent = SyncMessage(req, rsp, tcp::SocketClient::CONNECT_TIMEOUT); + + if (!sent) + { + AddStatusRecord(SqlState::S08001_CANNOT_CONNECT, "Failed to establish connection with the host."); + + return SqlResult::AI_ERROR; + } } catch (const OdbcError& err) { http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp b/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp index 0a02310..7fa7669 100644 --- a/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp +++ b/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp @@ -133,6 +133,9 @@ namespace /** SQL state HYC00 constant. */ const std::string STATE_HYC00 = "HYC00"; + /** SQL state HYT00 constant. */ + const std::string STATE_HYT00 = "HYT00"; + /** SQL state HYT01 constant. */ const std::string STATE_HYT01 = "HYT01"; @@ -365,6 +368,9 @@ namespace ignite case SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED: return STATE_HYC00; + case SqlState::SHYT00_TIMEOUT_EXPIRED: + return STATE_HYT00; + case SqlState::SHYT01_CONNECTION_TIMEOUT: return STATE_HYT01; http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/src/query/batch_query.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/query/batch_query.cpp b/modules/platforms/cpp/odbc/src/query/batch_query.cpp index 07d42d4..a9db8d8 100644 --- a/modules/platforms/cpp/odbc/src/query/batch_query.cpp +++ b/modules/platforms/cpp/odbc/src/query/batch_query.cpp @@ -153,7 +153,17 @@ namespace ignite try { - connection.SyncMessage(req, rsp); + // Setting connection timeout to 1 second more than query timeout itself. + int32_t connectionTimeout = timeout ? timeout + 1 : 0; + + bool success = connection.SyncMessage(req, rsp, connectionTimeout); + + if (!success) + { + diag.AddStatusRecord(SqlState::SHYT00_TIMEOUT_EXPIRED, "Query timeout expired"); + + return SqlResult::AI_ERROR; + } } catch (const OdbcError& err) { http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/src/query/data_query.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/query/data_query.cpp b/modules/platforms/cpp/odbc/src/query/data_query.cpp index 0539af5..e7bf5a0 100644 --- a/modules/platforms/cpp/odbc/src/query/data_query.cpp +++ b/modules/platforms/cpp/odbc/src/query/data_query.cpp @@ -222,7 +222,17 @@ namespace ignite try { - connection.SyncMessage(req, rsp); + // Setting connection timeout to 1 second more than query timeout itself. + int32_t connectionTimeout = timeout ? timeout + 1 : 0; + + bool success = connection.SyncMessage(req, rsp, connectionTimeout); + + if (!success) + { + diag.AddStatusRecord(SqlState::SHYT00_TIMEOUT_EXPIRED, "Query timeout expired"); + + return SqlResult::AI_ERROR; + } } catch (const OdbcError& err) {