Repository: ignite
Updated Branches:
  refs/heads/master 5fb04be39 -> db343b649


IGNITE-6876: Added in ODBC support for SQL_ATTR_CONNECTION_TIMEOUT


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/db343b64
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/db343b64
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/db343b64

Branch: refs/heads/master
Commit: db343b649e4289ac28b769a741eee7ea77db8018
Parents: 5fb04be
Author: Igor Sapego <isap...@gridgain.com>
Authored: Mon Nov 20 20:15:37 2017 +0300
Committer: Igor Sapego <isap...@gridgain.com>
Committed: Mon Nov 20 20:15:37 2017 +0300

----------------------------------------------------------------------
 .../cpp/odbc-test/src/attributes_test.cpp       |  22 +++
 .../cpp/odbc-test/src/queries_test.cpp          | 113 +++++++++++++++
 .../cpp/odbc/include/ignite/odbc/common_types.h |   3 +
 .../cpp/odbc/include/ignite/odbc/connection.h   |  81 +++++++++--
 .../include/ignite/odbc/system/socket_client.h  |  55 +++++++-
 .../odbc/os/linux/src/system/socket_client.cpp  | 141 +++++++++++++++++--
 .../odbc/os/win/src/system/socket_client.cpp    | 134 ++++++++++++++++--
 modules/platforms/cpp/odbc/src/connection.cpp   | 141 ++++++++++++++-----
 .../odbc/src/diagnostic/diagnostic_record.cpp   |   6 +
 .../cpp/odbc/src/query/batch_query.cpp          |  12 +-
 .../platforms/cpp/odbc/src/query/data_query.cpp |  12 +-
 11 files changed, 644 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc-test/src/attributes_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/src/attributes_test.cpp 
b/modules/platforms/cpp/odbc-test/src/attributes_test.cpp
index b87f4b9..c4c2433 100644
--- a/modules/platforms/cpp/odbc-test/src/attributes_test.cpp
+++ b/modules/platforms/cpp/odbc-test/src/attributes_test.cpp
@@ -227,4 +227,26 @@ BOOST_AUTO_TEST_CASE(StatementAttributeQueryTimeout)
     BOOST_REQUIRE_EQUAL(timeout, 7);
 }
 
+BOOST_AUTO_TEST_CASE(ConnectionAttributeConnectionTimeout)
+{
+    Connect("DRIVER={Apache Ignite};address=127.0.0.1:11110;schema=cache");
+
+    SQLUINTEGER timeout = -1;
+    SQLRETURN ret = SQLGetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, 
&timeout, 0, 0);
+
+    ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+    BOOST_REQUIRE_EQUAL(timeout, 0);
+
+    ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, 
reinterpret_cast<SQLPOINTER>(42), 0);
+
+    ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+
+    timeout = -1;
+
+    ret = SQLGetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, &timeout, 0, 0);
+
+    ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+    BOOST_REQUIRE_EQUAL(timeout, 42);
+}
+
 BOOST_AUTO_TEST_SUITE_END()

http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc-test/src/queries_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/src/queries_test.cpp 
b/modules/platforms/cpp/odbc-test/src/queries_test.cpp
index 6fcf7c9..dafab1a 100644
--- a/modules/platforms/cpp/odbc-test/src/queries_test.cpp
+++ b/modules/platforms/cpp/odbc-test/src/queries_test.cpp
@@ -2404,5 +2404,118 @@ BOOST_AUTO_TEST_CASE(TestCloseAfterEmptyUpdate)
         BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
 }
 
+BOOST_AUTO_TEST_CASE(TestConnectionTimeoutQuery)
+{
+    Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+    SQLRETURN ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, 
reinterpret_cast<SQLPOINTER>(5), 0);
+
+    ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+
+    InsertTestStrings(10, false);
+}
+
+BOOST_AUTO_TEST_CASE(TestConnectionTimeoutBatch)
+{
+    Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+    SQLRETURN ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, 
reinterpret_cast<SQLPOINTER>(5), 0);
+
+    ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+
+    InsertTestBatch(11, 20, 9);
+}
+
+BOOST_AUTO_TEST_CASE(TestConnectionTimeoutBoth)
+{
+    Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+    SQLRETURN ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, 
reinterpret_cast<SQLPOINTER>(5), 0);
+
+    ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+
+    InsertTestStrings(10, false);
+    InsertTestBatch(11, 20, 9);
+}
+
+BOOST_AUTO_TEST_CASE(TestQueryTimeoutQuery)
+{
+    Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+    SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, 
reinterpret_cast<SQLPOINTER>(5), 0);
+
+    ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt);
+
+    InsertTestStrings(10, false);
+}
+
+BOOST_AUTO_TEST_CASE(TestQueryTimeoutBatch)
+{
+    Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+    SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, 
reinterpret_cast<SQLPOINTER>(5), 0);
+
+    ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt);
+
+    InsertTestBatch(11, 20, 9);
+}
+
+BOOST_AUTO_TEST_CASE(TestQueryTimeoutBoth)
+{
+    Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+    SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, 
reinterpret_cast<SQLPOINTER>(5), 0);
+
+    ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt);
+
+    InsertTestStrings(10, false);
+    InsertTestBatch(11, 20, 9);
+}
+
+BOOST_AUTO_TEST_CASE(TestQueryAndConnectionTimeoutQuery)
+{
+    Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+    SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, 
reinterpret_cast<SQLPOINTER>(5), 0);
+
+    ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt);
+
+    ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, 
reinterpret_cast<SQLPOINTER>(3), 0);
+
+    ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+
+    InsertTestStrings(10, false);
+}
+
+BOOST_AUTO_TEST_CASE(TestQueryAndConnectionTimeoutBatch)
+{
+    Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+    SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, 
reinterpret_cast<SQLPOINTER>(5), 0);
+
+    ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt);
+
+    ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, 
reinterpret_cast<SQLPOINTER>(3), 0);
+
+    ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+
+    InsertTestBatch(11, 20, 9);
+}
+
+BOOST_AUTO_TEST_CASE(TestQueryAndConnectionTimeoutBoth)
+{
+    Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache");
+
+    SQLRETURN ret = SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, 
reinterpret_cast<SQLPOINTER>(5), 0);
+
+    ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_STMT, stmt);
+
+    ret = SQLSetConnectAttr(dbc, SQL_ATTR_CONNECTION_TIMEOUT, 
reinterpret_cast<SQLPOINTER>(3), 0);
+
+    ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, dbc);
+
+    InsertTestStrings(10, false);
+    InsertTestBatch(11, 20, 9);
+}
 
 BOOST_AUTO_TEST_SUITE_END()

http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/include/ignite/odbc/common_types.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/common_types.h 
b/modules/platforms/cpp/odbc/include/ignite/odbc/common_types.h
index 349147f..9c8c433 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/common_types.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/common_types.h
@@ -190,6 +190,9 @@ namespace ignite
                  */
                 SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
 
+                /** The timeout period expired before the data source 
responded to the request. */
+                SHYT00_TIMEOUT_EXPIRED,
+
                 /**
                  * The connection timeout period expired before the data source
                  * responded to the request.

http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h 
b/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h
index 34fed5f..1577ee7 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h
@@ -27,6 +27,7 @@
 #include "ignite/odbc/config/connection_info.h"
 #include "ignite/odbc/config/configuration.h"
 #include "ignite/odbc/diagnostic/diagnosable_adapter.h"
+#include "ignite/odbc/odbc_error.h"
 
 namespace ignite
 {
@@ -42,6 +43,19 @@ namespace ignite
             friend class Environment;
         public:
             /**
+             * Operation with timeout result.
+             */
+            struct OperationResult
+            {
+                enum T
+                {
+                    SUCCESS,
+                    FAIL,
+                    TIMEOUT
+                };
+            };
+
+            /**
              * Destructor.
              */
             ~Connection();
@@ -96,15 +110,21 @@ namespace ignite
              *
              * @param data Data buffer.
              * @param len Data length.
+             * @param timeout Timeout.
+             * @return @c true on success, @c false on timeout.
+             * @throw OdbcError on error.
              */
-            void Send(const int8_t* data, size_t len);
+            bool Send(const int8_t* data, size_t len, int32_t timeout);
 
             /**
              * Receive next message.
              *
              * @param msg Buffer for message.
+             * @param timeout Timeout.
+             * @return @c true on success, @c false on timeout.
+             * @throw OdbcError on error.
              */
-            void Receive(std::vector<int8_t>& msg);
+            bool Receive(std::vector<int8_t>& msg, int32_t timeout);
 
             /**
              * Get name of the assotiated schema.
@@ -134,9 +154,43 @@ namespace ignite
 
             /**
              * Synchronously send request message and receive response.
+             * Uses provided timeout.
              *
              * @param req Request message.
              * @param rsp Response message.
+             * @param timeout Timeout.
+             * @return @c true on success, @c false on timeout.
+             * @throw OdbcError on error.
+             */
+            template<typename ReqT, typename RspT>
+            bool SyncMessage(const ReqT& req, RspT& rsp, int32_t timeout)
+            {
+                std::vector<int8_t> tempBuffer;
+
+                parser.Encode(req, tempBuffer);
+
+                bool success = Send(tempBuffer.data(), tempBuffer.size(), 
timeout);
+
+                if (!success)
+                    return false;
+
+                success = Receive(tempBuffer, timeout);
+
+                if (!success)
+                    return false;
+
+                parser.Decode(rsp, tempBuffer);
+
+                return true;
+            }
+
+            /**
+             * Synchronously send request message and receive response.
+             * Uses connection timeout.
+             *
+             * @param req Request message.
+             * @param rsp Response message.
+             * @throw OdbcError on error.
              */
             template<typename ReqT, typename RspT>
             void SyncMessage(const ReqT& req, RspT& rsp)
@@ -145,9 +199,15 @@ namespace ignite
 
                 parser.Encode(req, tempBuffer);
 
-                Send(tempBuffer.data(), tempBuffer.size());
+                bool success = Send(tempBuffer.data(), tempBuffer.size(), 
timeout);
+
+                if (!success)
+                    throw OdbcError(SqlState::SHYT01_CONNECTION_TIMEOUT, "Send 
operation timed out");
+
+                success = Receive(tempBuffer, timeout);
 
-                Receive(tempBuffer);
+                if (!success)
+                    throw OdbcError(SqlState::SHYT01_CONNECTION_TIMEOUT, 
"Receive operation timed out");
 
                 parser.Decode(rsp, tempBuffer);
             }
@@ -280,18 +340,20 @@ namespace ignite
              *
              * @param dst Buffer for data.
              * @param len Number of bytes to receive.
-             * @return Number of successfully received bytes.
+             * @param timeout Timeout.
+             * @return Operation result.
              */
-            size_t ReceiveAll(void* dst, size_t len);
+            OperationResult::T ReceiveAll(void* dst, size_t len, int32_t 
timeout);
 
             /**
              * Send specified number of bytes.
              *
              * @param data Data buffer.
              * @param len Data length.
-             * @return Number of successfully sent bytes.
+             * @param timeout Timeout.
+             * @return Operation result.
              */
-            size_t SendAll(const int8_t* data, size_t len);
+            OperationResult::T SendAll(const int8_t* data, size_t len, int32_t 
timeout);
 
             /**
              * Perform handshake request.
@@ -311,6 +373,9 @@ namespace ignite
             /** State flag. */
             bool connected;
 
+            /** Connection timeout in seconds. */
+            int32_t timeout;
+
             /** Message parser. */
             Parser parser;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h
----------------------------------------------------------------------
diff --git 
a/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h 
b/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h
index 946605e..2a3cfa3 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/system/socket_client.h
@@ -44,6 +44,22 @@ namespace ignite
                 /** The time in seconds between individual keepalive probes. */
                 enum { KEEP_ALIVE_PROBES_PERIOD = 1 };
 
+                /** Connection establishment timeout in seconds. */
+                enum { CONNECT_TIMEOUT = 5 };
+
+                /**
+                 * Non-negative timeout operation result.
+                 */
+                struct WaitResult
+                {
+                    enum T
+                    {
+                        TIMEOUT = 0,
+
+                        SUCCESS = 1
+                    };
+                };
+
                 /**
                  * Constructor.
                  */
@@ -76,20 +92,31 @@ namespace ignite
                  *
                  * @param data Pointer to data to be sent.
                  * @param size Size of the data in bytes.
-                 * @return Number of bytes that have been sent on success and 
negative
-                 *         value on failure.
+                 * @param timeout Timeout.
+                 * @return Number of bytes that have been sent on success, 
+                 *     WaitResult::TIMEOUT on timeout and -errno on failure.
                  */
-                int Send(const int8_t* data, size_t size);
+                int Send(const int8_t* data, size_t size, int32_t timeout);
 
                 /**
                  * Receive data from established connection.
                  *
                  * @param buffer Pointer to data buffer.
                  * @param size Size of the buffer in bytes.
-                 * @return Number of bytes that have been received on success 
and negative
-                 *         value on failure.
+                 * @param timeout Timeout.
+                 * @return Number of bytes that have been sent on success,
+                 *     WaitResult::TIMEOUT on timeout and -errno on failure.
                  */
-                int Receive(int8_t* buffer, size_t size);
+                int Receive(int8_t* buffer, size_t size, int32_t timeout);
+
+                /**
+                 * Check if the socket is blocking or not.
+                 * @return @c true if the socket is blocking and false 
otherwise.
+                 */
+                bool IsBlocking() const
+                {
+                    return blocking;
+                }
 
             private:
                 /**
@@ -97,8 +124,24 @@ namespace ignite
                  */
                 void TrySetOptions(diagnostic::Diagnosable& diag);
 
+                /**
+                 * Wait on the socket for any event for specified time.
+                 * This function uses poll to achive timeout functionality
+                 * for every separate socket operation.
+                 *
+                 * @param timeout Timeout.
+                 * @param rd Wait for read if @c true, or for write if @c 
false.
+                 * @return -errno on error, WaitResult::TIMEOUT on timeout and
+                 *     WaitResult::SUCCESS on success.
+                 */
+                int WaitOnSocket(int32_t timeout, bool rd);
+
+                /** Handle. */
                 intptr_t socketHandle;
 
+                /** Blocking flag. */
+                bool blocking;
+
                 IGNITE_NO_COPY_ASSIGNMENT(SocketClient)
             };
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp 
b/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp
index 5a9b03a..a6d6151 100644
--- a/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp
+++ b/modules/platforms/cpp/odbc/os/linux/src/system/socket_client.cpp
@@ -20,6 +20,7 @@
 #include <netinet/tcp.h>
 #include <netdb.h>
 #include <unistd.h>
+#include <fcntl.h>
 
 #include <cstring>
 
@@ -35,26 +36,36 @@ namespace
 {
     /**
      * Get last socket error message.
+     * @param error Error code.
      * @return Last socket error message string.
      */
-    std::string GetLastSocketErrorMessage()
+    std::string GetSocketErrorMessage(int error)
     {
-        int lastError = errno;
         std::stringstream res;
 
-        res << "error_code=" << lastError;
+        res << "error_code=" << error;
 
-        if (lastError == 0)
+        if (error == 0)
             return res.str();
 
         char buffer[1024] = "";
 
-        strerror_r(lastError, buffer, sizeof(buffer));
-
-        res << ", msg=" << buffer;
+        if (!strerror_r(error, buffer, sizeof(buffer)))
+            res << ", msg=" << buffer;
 
         return res.str();
     }
+
+    /**
+     * Get last socket error message.
+     * @return Last socket error message string.
+     */
+    std::string GetLastSocketErrorMessage()
+    {
+        int lastError = errno;
+
+        return GetSocketErrorMessage(lastError);
+    }
 }
 
 namespace ignite
@@ -64,7 +75,9 @@ namespace ignite
         namespace tcp
         {
 
-            SocketClient::SocketClient() : socketHandle(SOCKET_ERROR)
+            SocketClient::SocketClient() :
+                socketHandle(SOCKET_ERROR),
+                blocking(true)
             {
                 // No-op.
             }
@@ -129,11 +142,27 @@ namespace ignite
                     res = connect(socketHandle, it->ai_addr, 
static_cast<int>(it->ai_addrlen));
                     if (SOCKET_ERROR == res)
                     {
-                        LOG_MSG("Connection failed: " << 
GetLastSocketErrorMessage());
+                        int lastError = errno;
+
+                        if (lastError != EWOULDBLOCK && lastError != 
EINPROGRESS)
+                        {
+                            LOG_MSG("Connection failed: " << 
GetSocketErrorMessage(lastError));
+
+                            Close();
 
-                        Close();
+                            continue;
+                        }
 
-                        continue;
+                        res = WaitOnSocket(CONNECT_TIMEOUT, false);
+
+                        if (res < 0 || res == WaitResult::TIMEOUT)
+                        {
+                            LOG_MSG("Connection timeout expired: " << 
GetSocketErrorMessage(-res));
+
+                            Close();
+
+                            continue;
+                        }
                     }
                     break;
                 }
@@ -153,13 +182,29 @@ namespace ignite
                 }
             }
 
-            int SocketClient::Send(const int8_t* data, size_t size)
+            int SocketClient::Send(const int8_t* data, size_t size, int32_t 
timeout)
             {
+                if (!blocking)
+                {
+                    int res = WaitOnSocket(timeout, false);
+
+                    if (res < 0 || res == WaitResult::TIMEOUT)
+                        return res;
+                }
+
                 return send(socketHandle, reinterpret_cast<const char*>(data), 
static_cast<int>(size), 0);
             }
 
-            int SocketClient::Receive(int8_t* buffer, size_t size)
+            int SocketClient::Receive(int8_t* buffer, size_t size, int32_t 
timeout)
             {
+                if (!blocking)
+                {
+                    int res = WaitOnSocket(timeout, true);
+
+                    if (res < 0 || res == WaitResult::TIMEOUT)
+                        return res;
+                }
+
                 return recv(socketHandle, reinterpret_cast<char*>(buffer), 
static_cast<int>(size), 0);
             }
 
@@ -203,6 +248,30 @@ namespace ignite
                         "Can not set up TCP no-delay mode");
                 }
 
+                res = setsockopt(socketHandle, SOL_SOCKET, SO_OOBINLINE,
+                    reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt));
+
+                if (SOCKET_ERROR == res)
+                {
+                    LOG_MSG("TCP out-of-bound data inlining setup failed: " << 
GetLastSocketErrorMessage());
+
+                    diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED,
+                        "Can not set up TCP out-of-bound data inlining");
+                }
+
+                blocking = false;
+
+                int flags;
+                if (((flags = fcntl(socketHandle, F_GETFL, 0)) < 0) ||
+                    (fcntl(socketHandle, F_SETFL, flags | O_NONBLOCK) < 0))
+                {
+                    blocking = true;
+                    LOG_MSG("Non-blocking mode setup failed: " << 
GetLastSocketErrorMessage());
+
+                    diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED,
+                        "Can not set up non-blocking mode. Timeouts are not 
available.");
+                }
+
                 res = setsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE,
                     reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt));
 
@@ -238,6 +307,52 @@ namespace ignite
                     diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED,
                         "Can not set up TCP keep-alive probes period");
                 }
+
+            }
+
+            int SocketClient::WaitOnSocket(int32_t timeout, bool rd)
+            {
+                int ready = 0;
+                int lastError = 0;
+
+                fd_set fds;
+
+                do {
+                    struct timeval tv = { 0 };
+                    tv.tv_sec = timeout;
+
+                    FD_ZERO(&fds);
+                    FD_SET(socketHandle, &fds);
+
+                    fd_set* readFds = 0;
+                    fd_set* writeFds = 0;
+
+                    if (rd)
+                        readFds = &fds;
+                    else
+                        writeFds = &fds;
+
+                    ready = select(static_cast<int>((socketHandle) + 1),
+                        readFds, writeFds, NULL, (timeout == 0 ? NULL : &tv));
+
+                    if (ready == SOCKET_ERROR)
+                        lastError = errno;
+
+                } while (ready == SOCKET_ERROR && lastError == EINTR);
+
+                if (ready == SOCKET_ERROR)
+                    return -lastError;
+
+                socklen_t size = sizeof(lastError);
+                int res = getsockopt(socketHandle, SOL_SOCKET, SO_ERROR, 
reinterpret_cast<char*>(&lastError), &size);
+
+                if (res != SOCKET_ERROR && lastError != 0)
+                    return -lastError;
+
+                if (ready == 0)
+                    return WaitResult::TIMEOUT;
+
+                return WaitResult::SUCCESS;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp 
b/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp
index 30fb7d7..6f87b93 100644
--- a/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp
+++ b/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp
@@ -34,17 +34,17 @@
 namespace
 {
     /**
-     * Get last socket error message.
-     * @return Last socket error message string.
+     * Get socket error message for the error code.
+     * @param error Error code.
+     * @return Socket error message string.
      */
-    std::string GetLastSocketErrorMessage()
+    std::string GetSocketErrorMessage(HRESULT error)
     {
-        HRESULT lastError = WSAGetLastError();
         std::stringstream res;
 
-        res << "error_code=" << lastError;
+        res << "error_code=" << error;
 
-        if (lastError == 0)
+        if (error == 0)
             return res.str();
 
         LPTSTR errorText = NULL;
@@ -58,7 +58,7 @@ namespace
             | FORMAT_MESSAGE_IGNORE_INSERTS,
             // unused with FORMAT_MESSAGE_FROM_SYSTEM
             NULL,
-            lastError,
+            error,
             MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US),
             // output
             reinterpret_cast<LPTSTR>(&errorText),
@@ -77,6 +77,17 @@ namespace
 
         return res.str();
     }
+
+    /**
+     * Get last socket error message.
+     * @return Last socket error message string.
+     */
+    std::string GetLastSocketErrorMessage()
+    {
+        HRESULT lastError = WSAGetLastError();
+
+        return GetSocketErrorMessage(lastError);
+    }
 }
 
 namespace ignite
@@ -86,7 +97,9 @@ namespace ignite
         namespace tcp
         {
 
-            SocketClient::SocketClient() : socketHandle(INVALID_SOCKET)
+            SocketClient::SocketClient() :
+                socketHandle(INVALID_SOCKET),
+                blocking(true)
             {
                 // No-op.
             }
@@ -170,11 +183,27 @@ namespace ignite
                     res = connect(socketHandle, it->ai_addr, 
static_cast<int>(it->ai_addrlen));
                     if (SOCKET_ERROR == res)
                     {
-                        LOG_MSG("Connection failed: " << 
GetLastSocketErrorMessage());
+                        int lastError = WSAGetLastError();
 
-                        Close();
+                        if (lastError != WSAEWOULDBLOCK)
+                        {
+                            LOG_MSG("Connection failed: " << 
GetSocketErrorMessage(lastError));
 
-                        continue;
+                            Close();
+
+                            continue;
+                        }
+
+                        res = WaitOnSocket(CONNECT_TIMEOUT, false);
+
+                        if (res < 0 || res == WaitResult::TIMEOUT)
+                        {
+                            LOG_MSG("Connection timeout expired: " << 
GetSocketErrorMessage(-res));
+
+                            Close();
+
+                            continue;
+                        }
                     }
                     break;
                 }
@@ -194,21 +223,39 @@ namespace ignite
                 }
             }
 
-            int SocketClient::Send(const int8_t* data, size_t size)
+            int SocketClient::Send(const int8_t* data, size_t size, int32_t 
timeout)
             {
+                if (!blocking)
+                {
+                    int res = WaitOnSocket(timeout, false);
+
+                    if (res < 0 || res == WaitResult::TIMEOUT)
+                        return res;
+                }
+
                 return send(socketHandle, reinterpret_cast<const char*>(data), 
static_cast<int>(size), 0);
             }
 
-            int SocketClient::Receive(int8_t* buffer, size_t size)
+            int SocketClient::Receive(int8_t* buffer, size_t size, int32_t 
timeout)
             {
+                if (!blocking)
+                {
+                    int res = WaitOnSocket(timeout, true);
+
+                    if (res < 0 || res == WaitResult::TIMEOUT)
+                        return res;
+                }
+
                 return recv(socketHandle, reinterpret_cast<char*>(buffer), 
static_cast<int>(size), 0);
             }
 
             void SocketClient::TrySetOptions(diagnostic::Diagnosable& diag)
             {
                 BOOL trueOpt = TRUE;
+                ULONG uTrueOpt = TRUE;
                 int bufSizeOpt = BUFFER_SIZE;
 
+
                 int res = setsockopt(socketHandle, SOL_SOCKET, SO_SNDBUF,
                     reinterpret_cast<char*>(&bufSizeOpt), sizeof(bufSizeOpt));
 
@@ -242,6 +289,29 @@ namespace ignite
                         "Can not set up TCP no-delay mode");
                 }
 
+                res = setsockopt(socketHandle, SOL_SOCKET, SO_OOBINLINE,
+                    reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt));
+
+                if (SOCKET_ERROR == res)
+                {
+                    LOG_MSG("TCP out-of-bound data inlining setup failed: " << 
GetLastSocketErrorMessage());
+
+                    diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED,
+                        "Can not set up TCP out-of-bound data inlining");
+                }
+
+                blocking = false;
+                res = ioctlsocket(socketHandle, FIONBIO, &uTrueOpt);
+
+                if (res == SOCKET_ERROR)
+                {
+                    blocking = true;
+                    LOG_MSG("Non-blocking mode setup failed: " << 
GetLastSocketErrorMessage());
+
+                    diag.AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED,
+                        "Can not set up non-blocking mode. Timeouts are not 
available.");
+                }
+
                 res = setsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE,
                     reinterpret_cast<char*>(&trueOpt), sizeof(trueOpt));
 
@@ -318,6 +388,44 @@ namespace ignite
 #endif
             }
 
+            int SocketClient::WaitOnSocket(int32_t timeout, bool rd)
+            {
+                int ready = 0;
+                int lastError = 0;
+
+                fd_set fds;
+
+                do {
+                    struct timeval tv = { 0 };
+                    tv.tv_sec = timeout;
+
+                    FD_ZERO(&fds);
+                    FD_SET(socketHandle, &fds);
+
+                    fd_set* readFds = 0;
+                    fd_set* writeFds = 0;
+
+                    if (rd)
+                        readFds = &fds;
+                    else
+                        writeFds = &fds;
+
+                    ready = select(static_cast<int>((socketHandle) + 1),
+                        readFds, writeFds, NULL, (timeout == 0 ? NULL : &tv));
+
+                    if (ready == SOCKET_ERROR)
+                        lastError = WSAGetLastError();
+
+                } while (ready == SOCKET_ERROR && lastError == WSAEINTR);
+
+                if (ready == SOCKET_ERROR)
+                    return -lastError;
+
+                if (ready == 0)
+                    return WaitResult::TIMEOUT;
+
+                return WaitResult::SUCCESS;
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/src/connection.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/connection.cpp 
b/modules/platforms/cpp/odbc/src/connection.cpp
index b99d768..8b03876 100644
--- a/modules/platforms/cpp/odbc/src/connection.cpp
+++ b/modules/platforms/cpp/odbc/src/connection.cpp
@@ -16,6 +16,7 @@
  */
 
 #include <cstring>
+#include <cstddef>
 
 #include <sstream>
 
@@ -27,7 +28,6 @@
 #include "ignite/odbc/connection.h"
 #include "ignite/odbc/message.h"
 #include "ignite/odbc/config/configuration.h"
-#include "ignite/odbc/odbc_error.h"
 
 namespace
 {
@@ -46,6 +46,7 @@ namespace ignite
         Connection::Connection() :
             socket(),
             connected(false),
+            timeout(0),
             parser(),
             config(),
             info(config)
@@ -194,7 +195,7 @@ namespace ignite
             return SqlResult::AI_SUCCESS;
         }
 
-        void Connection::Send(const int8_t* data, size_t len)
+        bool Connection::Send(const int8_t* data, size_t len, int32_t timeout)
         {
             if (!connected)
                 throw OdbcError(SqlState::S08003_NOT_CONNECTED, "Connection is 
not established");
@@ -209,38 +210,45 @@ namespace ignite
 
             memcpy(msg.GetData() + sizeof(OdbcProtocolHeader), data, len);
 
-            size_t sent = SendAll(msg.GetData(), msg.GetSize());
+            OperationResult::T res = SendAll(msg.GetData(), msg.GetSize(), 
timeout);
 
-            if (sent != len + sizeof(OdbcProtocolHeader))
+            if (res == OperationResult::TIMEOUT)
+                return false;
+
+            if (res == OperationResult::FAIL)
                 throw OdbcError(SqlState::S08S01_LINK_FAILURE, "Can not send 
message due to connection failure");
 
             LOG_MSG("message sent: (" <<  msg.GetSize() << " bytes)" << 
utility::HexDump(msg.GetData(), msg.GetSize()));
+
+            return true;
         }
 
-        size_t Connection::SendAll(const int8_t* data, size_t len)
+        Connection::OperationResult::T Connection::SendAll(const int8_t* data, 
size_t len, int32_t timeout)
         {
             int sent = 0;
 
             while (sent != static_cast<int64_t>(len))
             {
-                int res = socket.Send(data + sent, len - sent);
+                int res = socket.Send(data + sent, len - sent, timeout);
 
                 LOG_MSG("Sent: " << res);
 
-                if (res <= 0)
+                if (res < 0 || res == tcp::SocketClient::WaitResult::TIMEOUT)
                 {
                     Close();
 
-                    return sent;
+                    return res < 0 ? OperationResult::FAIL : 
OperationResult::TIMEOUT;
                 }
 
                 sent += res;
             }
 
-            return sent;
+            assert(static_cast<size_t>(sent) == len);
+
+            return OperationResult::SUCCESS;
         }
 
-        void Connection::Receive(std::vector<int8_t>& msg)
+        bool Connection::Receive(std::vector<int8_t>& msg, int32_t timeout)
         {
             if (!connected)
                 throw OdbcError(SqlState::S08003_NOT_CONNECTED, "Connection is 
not established");
@@ -249,36 +257,40 @@ namespace ignite
 
             OdbcProtocolHeader hdr;
 
-            size_t received = ReceiveAll(reinterpret_cast<int8_t*>(&hdr), 
sizeof(hdr));
+            OperationResult::T res = 
ReceiveAll(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr), timeout);
+
+            if (res == OperationResult::TIMEOUT)
+                return false;
 
-            if (received != sizeof(hdr))
+            if (res == OperationResult::FAIL)
                 throw OdbcError(SqlState::S08S01_LINK_FAILURE, "Can not 
receive message header");
 
             if (hdr.len < 0)
             {
                 Close();
 
-                throw OdbcError(SqlState::S08S01_LINK_FAILURE, "Protocol 
error: Message length is negative");
+                throw OdbcError(SqlState::SHY000_GENERAL_ERROR, "Protocol 
error: Message length is negative");
             }
 
             if (hdr.len == 0)
-                return;
+                return false;
 
             msg.resize(hdr.len);
 
-            received = ReceiveAll(&msg[0], hdr.len);
+            res = ReceiveAll(&msg[0], hdr.len, timeout);
 
-            if (received != hdr.len)
-            {
-                msg.resize(received);
+            if (res == OperationResult::TIMEOUT)
+                return false;
 
+            if (res == OperationResult::FAIL)
                 throw OdbcError(SqlState::S08S01_LINK_FAILURE, "Can not 
receive message body");
-            }
 
             LOG_MSG("Message received: " << utility::HexDump(&msg[0], 
msg.size()));
+
+            return true;
         }
 
-        size_t Connection::ReceiveAll(void* dst, size_t len)
+        Connection::OperationResult::T Connection::ReceiveAll(void* dst, 
size_t len, int32_t timeout)
         {
             size_t remain = len;
             int8_t* buffer = reinterpret_cast<int8_t*>(dst);
@@ -287,20 +299,20 @@ namespace ignite
             {
                 size_t received = len - remain;
 
-                int res = socket.Receive(buffer + received, remain);
+                int res = socket.Receive(buffer + received, remain, timeout);
                 LOG_MSG("Receive res: " << res << " remain: " << remain);
 
-                if (res <= 0)
+                if (res < 0 || res == tcp::SocketClient::WaitResult::TIMEOUT)
                 {
                     Close();
 
-                    return received;
+                    return res < 0 ? OperationResult::FAIL : 
OperationResult::TIMEOUT;
                 }
 
                 remain -= static_cast<size_t>(res);
             }
 
-            return len;
+            return OperationResult::SUCCESS;
         }
 
         const std::string& Connection::GetSchema() const
@@ -334,6 +346,14 @@ namespace ignite
             IGNITE_ODBC_API_CALL(InternalTransactionRollback());
         }
 
+        SqlResult::Type Connection::InternalTransactionRollback()
+        {
+            AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
+                "Rollback operation is not supported.");
+
+            return SqlResult::AI_ERROR;
+        }
+
         void Connection::GetAttribute(int attr, void* buf, SQLINTEGER bufLen, 
SQLINTEGER* valueLen)
         {
             IGNITE_ODBC_API_CALL(InternalGetAttribute(attr, buf, bufLen, 
valueLen));
@@ -343,7 +363,7 @@ namespace ignite
         {
             if (!buf)
             {
-                AddStatusRecord(SqlState::SHY000_GENERAL_ERROR, "Data buffer 
is NULL.");
+                AddStatusRecord(SqlState::SHY009_INVALID_USE_OF_NULL_POINTER, 
"Data buffer is null.");
 
                 return SqlResult::AI_ERROR;
             }
@@ -362,6 +382,18 @@ namespace ignite
                     break;
                 }
 
+                case SQL_ATTR_CONNECTION_TIMEOUT:
+                {
+                    SQLUINTEGER *val = reinterpret_cast<SQLUINTEGER*>(buf);
+
+                    *val = static_cast<SQLUINTEGER>(timeout);
+
+                    if (valueLen)
+                        *valueLen = SQL_IS_INTEGER;
+
+                    break;
+                }
+
                 default:
                 {
                     
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
@@ -381,6 +413,13 @@ namespace ignite
 
         SqlResult::Type Connection::InternalSetAttribute(int attr, void* 
value, SQLINTEGER valueLen)
         {
+            if (!value)
+            {
+                AddStatusRecord(SqlState::SHY009_INVALID_USE_OF_NULL_POINTER, 
"Value pointer is null.");
+
+                return SqlResult::AI_ERROR;
+            }
+
             switch (attr)
             {
                 case SQL_ATTR_CONNECTION_DEAD:
@@ -390,6 +429,39 @@ namespace ignite
                     return SqlResult::AI_ERROR;
                 }
 
+                case SQL_ATTR_CONNECTION_TIMEOUT:
+                {
+                    SQLUINTEGER uTimeout = 
static_cast<SQLUINTEGER>(reinterpret_cast<ptrdiff_t>(value));
+
+                    if (uTimeout != 0 && connected && socket.IsBlocking())
+                    {
+                        timeout = 0;
+
+                        AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, 
"Can not set timeout, because can not "
+                            "enable non-blocking mode on TCP connection. 
Setting to 0.");
+
+                        return SqlResult::AI_SUCCESS_WITH_INFO;
+                    }
+
+                    if (uTimeout > INT32_MAX)
+                    {
+                        timeout = INT32_MAX;
+
+                        std::stringstream ss;
+
+                        ss << "Value is too big: " << uTimeout << ", changing 
to " << timeout << ".";
+                        std::string msg = ss.str();
+
+                        AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, 
msg);
+
+                        return SqlResult::AI_SUCCESS_WITH_INFO;
+                    }
+
+                    timeout = static_cast<int32_t>(uTimeout);
+
+                    break;
+                }
+
                 default:
                 {
                     
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
@@ -402,14 +474,6 @@ namespace ignite
             return SqlResult::AI_SUCCESS;
         }
 
-        SqlResult::Type Connection::InternalTransactionRollback()
-        {
-            AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
-                "Rollback operation is not supported.");
-
-            return SqlResult::AI_ERROR;
-        }
-
         SqlResult::Type Connection::MakeRequestHandshake()
         {
             bool distributedJoins = false;
@@ -451,7 +515,16 @@ namespace ignite
 
             try
             {
-                SyncMessage(req, rsp);
+                // Workaround for some Linux systems that report connection on 
non-blocking
+                // sockets as successfull but fail to establish real 
connection.
+                bool sent = SyncMessage(req, rsp, 
tcp::SocketClient::CONNECT_TIMEOUT);
+
+                if (!sent)
+                {
+                    AddStatusRecord(SqlState::S08001_CANNOT_CONNECT, "Failed 
to establish connection with the host.");
+
+                    return SqlResult::AI_ERROR;
+                }
             }
             catch (const OdbcError& err)
             {

http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp 
b/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp
index 0a02310..7fa7669 100644
--- a/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp
+++ b/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp
@@ -133,6 +133,9 @@ namespace
     /** SQL state HYC00 constant. */
     const std::string STATE_HYC00 = "HYC00";
 
+    /** SQL state HYT00 constant. */
+    const std::string STATE_HYT00 = "HYT00";
+
     /** SQL state HYT01 constant. */
     const std::string STATE_HYT01 = "HYT01";
 
@@ -365,6 +368,9 @@ namespace ignite
                     case SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED:
                         return STATE_HYC00;
 
+                    case SqlState::SHYT00_TIMEOUT_EXPIRED:
+                        return STATE_HYT00;
+
                     case SqlState::SHYT01_CONNECTION_TIMEOUT:
                         return STATE_HYT01;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/src/query/batch_query.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/query/batch_query.cpp 
b/modules/platforms/cpp/odbc/src/query/batch_query.cpp
index 07d42d4..a9db8d8 100644
--- a/modules/platforms/cpp/odbc/src/query/batch_query.cpp
+++ b/modules/platforms/cpp/odbc/src/query/batch_query.cpp
@@ -153,7 +153,17 @@ namespace ignite
 
                 try
                 {
-                    connection.SyncMessage(req, rsp);
+                    // Setting connection timeout to 1 second more than query 
timeout itself.
+                    int32_t connectionTimeout = timeout ? timeout + 1 : 0;
+
+                    bool success = connection.SyncMessage(req, rsp, 
connectionTimeout);
+
+                    if (!success)
+                    {
+                        diag.AddStatusRecord(SqlState::SHYT00_TIMEOUT_EXPIRED, 
"Query timeout expired");
+
+                        return SqlResult::AI_ERROR;
+                    }
                 }
                 catch (const OdbcError& err)
                 {

http://git-wip-us.apache.org/repos/asf/ignite/blob/db343b64/modules/platforms/cpp/odbc/src/query/data_query.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/query/data_query.cpp 
b/modules/platforms/cpp/odbc/src/query/data_query.cpp
index 0539af5..e7bf5a0 100644
--- a/modules/platforms/cpp/odbc/src/query/data_query.cpp
+++ b/modules/platforms/cpp/odbc/src/query/data_query.cpp
@@ -222,7 +222,17 @@ namespace ignite
 
                 try
                 {
-                    connection.SyncMessage(req, rsp);
+                    // Setting connection timeout to 1 second more than query 
timeout itself.
+                    int32_t connectionTimeout = timeout ? timeout + 1 : 0;
+
+                    bool success = connection.SyncMessage(req, rsp, 
connectionTimeout);
+
+                    if (!success)
+                    {
+                        diag.AddStatusRecord(SqlState::SHYT00_TIMEOUT_EXPIRED, 
"Query timeout expired");
+
+                        return SqlResult::AI_ERROR;
+                    }
                 }
                 catch (const OdbcError& err)
                 {

Reply via email to