Repository: drill Updated Branches: refs/heads/master 9878170d0 -> ba2280612
DRILL-4647: C++ client fails to propagate a dead connection error to the application. This closes #493 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6c67f458 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6c67f458 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6c67f458 Branch: refs/heads/master Commit: 6c67f458b9bb1a6a84f901561bb7686315b0207f Parents: 9878170 Author: Parth Chandra <[email protected]> Authored: Fri Apr 29 17:38:44 2016 -0700 Committer: Parth Chandra <[email protected]> Committed: Fri Jul 15 11:51:14 2016 -0700 ---------------------------------------------------------------------- .../native/client/src/clientlib/drillClient.cpp | 4 ++ .../client/src/clientlib/drillClientImpl.cpp | 46 ++++++++++++++------ 2 files changed, 36 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/6c67f458/contrib/native/client/src/clientlib/drillClient.cpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp index 92c5194..1251058 100644 --- a/contrib/native/client/src/clientlib/drillClient.cpp +++ b/contrib/native/client/src/clientlib/drillClient.cpp @@ -369,6 +369,10 @@ status_t DrillClient::submitQuery(Drill::QueryType t, const std::string& plan, p ::exec::shared::QueryType castedType = static_cast< ::exec::shared::QueryType> (t); DrillClientQueryResult* pResult=this->m_pImpl->SubmitQuery(castedType, plan, listener, listenerCtx); + if(pResult==NULL){ + *qHandle=NULL; + return (status_t)this->m_pImpl->getError()->status; + } *qHandle=(QueryHandle_t)pResult; return QRY_SUCCESS; } http://git-wip-us.apache.org/repos/asf/drill/blob/6c67f458/contrib/native/client/src/clientlib/drillClientImpl.cpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp index 3ec01f5..b5d5a31 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.cpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp @@ -218,6 +218,8 @@ void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & e // Close connection. DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat. Closing connection.";) shutdownSocket(); + //broadcast to any executing queries + handleConnError(CONN_FAILURE, getMessage(ERR_QRY_COMMERR, "Connection to drillbit lost.")); } } } @@ -469,34 +471,50 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t uint64_t coordId; DrillClientQueryResult* pQuery=NULL; + connectionStatus_t cStatus=CONN_SUCCESS; { boost::lock_guard<boost::mutex> prLock(this->m_prMutex); boost::lock_guard<boost::mutex> dcLock(this->m_dcMutex); coordId = this->getNextCoordinationId(); OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::RUN_QUERY, coordId, &query); - sendSync(out_msg); + // Create the result object and register the listener before we send the query + // because sometimes the caller is not checking the status of the submitQuery call. + // This way, the broadcast error call will cause the results listener to be called + // with a COMM_ERROR status. pQuery = new DrillClientQueryResult(this, coordId, plan); pQuery->registerListener(l, lCtx); - bool sendRequest=false; this->m_queryIds[coordId]=pQuery; - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query request. " << "[" << m_connectedHost << "]" << "Coordination id = " << coordId << std::endl;) - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query " << "Coordination id = " << coordId << " query: " << plan << std::endl;) + connectionStatus_t cStatus=sendSync(out_msg); + if(cStatus == CONN_SUCCESS){ + bool sendRequest=false; - if(m_pendingRequests++==0){ - sendRequest=true; - }else{ - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << std::endl;) - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;) - } - if(sendRequest){ - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending requests = " - << m_pendingRequests << std::endl;) - getNextResult(); // async wait for results + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query request. " << "[" << m_connectedHost << "]" << "Coordination id = " << coordId << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query " << "Coordination id = " << coordId << " query: " << plan << std::endl;) + + if(m_pendingRequests++==0){ + sendRequest=true; + }else{ + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;) + } + if(sendRequest){ + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending requests = " + << m_pendingRequests << std::endl;) + getNextResult(); // async wait for results + } } + + } + if(cStatus!=CONN_SUCCESS){ + this->m_queryIds.erase(coordId); + delete pQuery; + return NULL; } + + //run this in a new thread startMessageListener();
