Repository: drill
Updated Branches:
  refs/heads/master 9878170d0 -> ba2280612


DRILL-4647: C++ client fails to propagate a dead connection error to the 
application.
This closes #493


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6c67f458
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6c67f458
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6c67f458

Branch: refs/heads/master
Commit: 6c67f458b9bb1a6a84f901561bb7686315b0207f
Parents: 9878170
Author: Parth Chandra <[email protected]>
Authored: Fri Apr 29 17:38:44 2016 -0700
Committer: Parth Chandra <[email protected]>
Committed: Fri Jul 15 11:51:14 2016 -0700

----------------------------------------------------------------------
 .../native/client/src/clientlib/drillClient.cpp |  4 ++
 .../client/src/clientlib/drillClientImpl.cpp    | 46 ++++++++++++++------
 2 files changed, 36 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/6c67f458/contrib/native/client/src/clientlib/drillClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp 
b/contrib/native/client/src/clientlib/drillClient.cpp
index 92c5194..1251058 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -369,6 +369,10 @@ status_t DrillClient::submitQuery(Drill::QueryType t, 
const std::string& plan, p
 
     ::exec::shared::QueryType castedType = static_cast< 
::exec::shared::QueryType> (t);
     DrillClientQueryResult* pResult=this->m_pImpl->SubmitQuery(castedType, 
plan, listener, listenerCtx);
+    if(pResult==NULL){
+        *qHandle=NULL;
+        return (status_t)this->m_pImpl->getError()->status;
+    }
     *qHandle=(QueryHandle_t)pResult;
     return QRY_SUCCESS;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/6c67f458/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp 
b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 3ec01f5..b5d5a31 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -218,6 +218,8 @@ void DrillClientImpl::handleHeartbeatTimeout(const 
boost::system::error_code & e
                 // Close connection.
                 DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No 
heartbeat. Closing connection.";)
                 shutdownSocket();
+                //broadcast to any executing queries
+                handleConnError(CONN_FAILURE, getMessage(ERR_QRY_COMMERR, 
"Connection to drillbit lost."));
             }
         }
     }
@@ -469,34 +471,50 @@ DrillClientQueryResult* 
DrillClientImpl::SubmitQuery(::exec::shared::QueryType t
 
     uint64_t coordId;
     DrillClientQueryResult* pQuery=NULL;
+    connectionStatus_t cStatus=CONN_SUCCESS;
     {
         boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
         boost::lock_guard<boost::mutex> dcLock(this->m_dcMutex);
         coordId = this->getNextCoordinationId();
         OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::RUN_QUERY, 
coordId, &query);
-        sendSync(out_msg);
 
+        // Create the result object and register the listener before we send 
the query
+        // because sometimes the caller is not checking the status of the 
submitQuery call.
+        // This way, the broadcast error call will cause the results listener 
to be called
+        // with a COMM_ERROR status.
         pQuery = new DrillClientQueryResult(this, coordId, plan);
         pQuery->registerListener(l, lCtx);
-        bool sendRequest=false;
         this->m_queryIds[coordId]=pQuery;
 
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent query request. " << "[" << 
m_connectedHost << "]"  << "Coordination id = " << coordId << std::endl;)
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent query " <<  "Coordination 
id = " << coordId << " query: " << plan << std::endl;)
+        connectionStatus_t cStatus=sendSync(out_msg);
+        if(cStatus == CONN_SUCCESS){
+            bool sendRequest=false;
 
-        if(m_pendingRequests++==0){
-            sendRequest=true;
-        }else{
-            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queueing query request to 
server" << std::endl;)
-            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = 
" << m_pendingRequests << std::endl;)
-        }
-        if(sendRequest){
-            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending query request. 
Number of pending requests = "
-                << m_pendingRequests << std::endl;)
-            getNextResult(); // async wait for results
+            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent query request. " << 
"[" << m_connectedHost << "]"  << "Coordination id = " << coordId << std::endl;)
+                DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent query " <<  
"Coordination id = " << coordId << " query: " << plan << std::endl;)
+
+                if(m_pendingRequests++==0){
+                    sendRequest=true;
+                }else{
+                    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queueing query 
request to server" << std::endl;)
+                        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of 
pending requests = " << m_pendingRequests << std::endl;)
+                }
+            if(sendRequest){
+                DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending query request. 
Number of pending requests = "
+                        << m_pendingRequests << std::endl;)
+                    getNextResult(); // async wait for results
+            }
         }
+
+    }
+    if(cStatus!=CONN_SUCCESS){
+        this->m_queryIds.erase(coordId);
+        delete pQuery;
+        return NULL;
     }
 
+
+
     //run this in a new thread
     startMessageListener();
 

Reply via email to