DRILL-1305: C++ Client. Consume QueryState message from the Drillbit.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ae2790ed Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ae2790ed Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ae2790ed Branch: refs/heads/master Commit: ae2790ed8b2bf2293e85b79ce4344ba3f8910cf9 Parents: b7aebbd Author: Xiao Meng <xiaom...@gmail.com> Authored: Wed Sep 17 12:39:48 2014 -0700 Committer: Parth Chandra <pchan...@maprtech.com> Committed: Tue Oct 21 19:32:11 2014 -0700 ---------------------------------------------------------------------- .../native/client/example/querySubmitter.cpp | 17 ++- .../client/src/clientlib/drillClientImpl.cpp | 140 +++++++++++++++---- .../client/src/clientlib/drillClientImpl.hpp | 14 +- contrib/native/client/src/clientlib/errmsgs.cpp | 6 +- contrib/native/client/src/clientlib/errmsgs.hpp | 4 +- .../native/client/src/include/drill/common.hpp | 10 +- 6 files changed, 148 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ae2790ed/contrib/native/client/example/querySubmitter.cpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp index 7154dda..040f9d7 100644 --- a/contrib/native/client/example/querySubmitter.cpp +++ b/contrib/native/client/example/querySubmitter.cpp @@ -60,7 +60,12 @@ Drill::status_t SchemaListener(void* ctx, Drill::FieldDefPtr fields, Drill::Dril } Drill::status_t QueryResultsListener(void* ctx, Drill::RecordBatch* b, Drill::DrillClientError* err){ + // Invariant: + // (received an record batch and err is NULL) + // or + // (received query state message passed by `err` and b is NULL) if(!err){ + assert(b!=NULL); b->print(std::cout, 0); // print all rows delete b; // we're done with this batch, we can delete it if(bTestCancel){ @@ -69,8 +74,16 @@ Drill::status_t QueryResultsListener(void* ctx, Drill::RecordBatch* b, Drill::Dr return Drill::QRY_SUCCESS ; } }else{ - std::cerr<< "ERROR: " << err->msg << std::endl; - return Drill::QRY_FAILURE; + assert(b==NULL); + switch(err->status) { + case Drill::QRY_COMPLETED: + case Drill::QRY_CANCELED: + std::cerr<< "INFO: " << err->msg << std::endl; + return Drill::QRY_SUCCESS; + default: + std::cerr<< "ERROR: " << err->msg << std::endl; + return Drill::QRY_FAILURE; + } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ae2790ed/contrib/native/client/src/clientlib/drillClientImpl.cpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp index 77795ed..2f27b48 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.cpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp @@ -30,6 +30,7 @@ #else #include <zookeeper/zookeeper.h> #endif +#include <boost/assign.hpp> #include "drill/drillClient.hpp" #include "drill/recordBatch.hpp" @@ -46,6 +47,14 @@ namespace Drill{ +static std::map<exec::shared::QueryResult_QueryState, status_t> QUERYSTATE_TO_STATUS_MAP = boost::assign::map_list_of + (exec::shared::QueryResult_QueryState_PENDING, QRY_PENDING) + (exec::shared::QueryResult_QueryState_RUNNING, QRY_RUNNING) + (exec::shared::QueryResult_QueryState_COMPLETED, QRY_COMPLETED) + (exec::shared::QueryResult_QueryState_CANCELED, QRY_CANCELED) + (exec::shared::QueryResult_QueryState_FAILED, QRY_FAILED) + (exec::shared::QueryResult_QueryState_UNKNOWN_QUERY, QRY_UNKNOWN_QUERY); + RpcEncoder DrillClientImpl::s_encoder; RpcDecoder DrillClientImpl::s_decoder; @@ -178,7 +187,7 @@ connectionStatus_t DrillClientImpl::recvHandshake(){ boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred) ); - DRILL_LOG(LOG_DEBUG) << "Sent handshake read request to server" << std::endl; + DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::recvHandshake: async read waiting for server handshake response.\n"; m_io_service.run(); if(m_rbuf!=NULL){ Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL; @@ -265,6 +274,7 @@ bool DrillClientImpl::validateHandShake(const char* defaultSchema){ OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b); sendSync(out_msg); + DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n"; } recvHandshake(); @@ -307,16 +317,16 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t bool sendRequest=false; this->m_queryIds[coordId]=pQuery; - DRILL_LOG(LOG_DEBUG) << "Submit Query Request. Coordination id = " << coordId << std::endl; + DRILL_LOG(LOG_DEBUG) << "Sent query request. Coordination id = " << coordId << std::endl; if(m_pendingRequests++==0){ sendRequest=true; }else{ - DRILL_LOG(LOG_DEBUG) << "Queueing read request to server" << std::endl; + DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << std::endl; DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl; } if(sendRequest){ - DRILL_LOG(LOG_DEBUG) << "Sending read request. Number of pending requests = " + DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending requests = " << m_pendingRequests << std::endl; getNextResult(); // async wait for results } @@ -325,9 +335,12 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t //run this in a new thread { if(this->m_pListenerThread==NULL){ - DRILL_LOG(LOG_DEBUG) << "Starting listener thread." << std::endl; - this->m_pListenerThread= new boost::thread(boost::bind(&boost::asio::io_service::run, - &this->m_io_service)); + // reset io_service before running + m_io_service.reset(); + this->m_pListenerThread = new boost::thread(boost::bind(&boost::asio::io_service::run, + &this->m_io_service)); + DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::SubmitQuery: Starting listener thread: " + << this->m_pListenerThread << std::endl; } } return pQuery; @@ -366,19 +379,23 @@ void DrillClientImpl::getNextResult(){ boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred) ); - DRILL_LOG(LOG_DEBUG) << "Sent read request to server" << std::endl; + DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::getNextResult: async_read from the server\n"; } void DrillClientImpl::waitForResults(){ this->m_pListenerThread->join(); - DRILL_LOG(LOG_DEBUG) << "Listener thread exited." << std::endl; + DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::waitForResults: Listener thread " + << this->m_pListenerThread << " exited." << std::endl; delete this->m_pListenerThread; this->m_pListenerThread=NULL; } -status_t DrillClientImpl::readMsg(ByteBuf_t _buf, - AllocatedBufferPtr* allocatedBuffer, - InBoundRpcMessage& msg, +status_t DrillClientImpl::readMsg(ByteBuf_t _buf, + AllocatedBufferPtr* allocatedBuffer, + InBoundRpcMessage& msg, boost::system::error_code& error){ + + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer " + << reinterpret_cast<int*>(_buf) << std::endl; size_t leftover=0; uint32_t rmsgLen; AllocatedBufferPtr currentBuffer; @@ -394,8 +411,9 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, if(rmsgLen>0){ leftover = LEN_PREFIX_BUFLEN - bytes_read; // Allocate a buffer - DRILL_LOG(LOG_TRACE) << "Allocated and locked buffer." << std::endl; currentBuffer=new AllocatedBuffer(rmsgLen); + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Allocated and locked buffer: [ " + << currentBuffer << ", size = " << rmsgLen << " ]\n"; if(currentBuffer==NULL){ Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL); @@ -433,6 +451,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL); } } + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free buffer " + << reinterpret_cast<int*>(_buf) << std::endl; Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); return QRY_SUCCESS; } @@ -440,7 +460,11 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){ DrillClientQueryResult* pDrillClientQueryResult=NULL; status_t ret=QRY_SUCCESS; - exec::shared::QueryId qid; + exec::shared::QueryId qid; + // Be a good client and send ack as early as possible. + // Drillbit pushed the query result to the client, the client should send ack + // whenever it receives the message + sendAck(msg, true); { boost::lock_guard<boost::mutex> lock(this->m_dcMutex); exec::shared::QueryResult* qr = new exec::shared::QueryResult; //Record Batch will own this object and free it up. @@ -464,20 +488,61 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer return ret; } DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " << - debugPrintQid(*pDrillClientQueryResult->m_pQueryId) - << std::endl; - //Check QueryResult.queryState. QueryResult could have an error. - if(qr->query_state() == exec::shared::QueryResult_QueryState_FAILED){ - status_t ret=handleQryError(QRY_FAILURE, qr->error(0), pDrillClientQueryResult); - delete allocatedBuffer; - delete qr; + debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl; + + // Drillbit may send query state message which does not contain any + // record batch. + if (qr->has_query_state()) { + ret = QUERYSTATE_TO_STATUS_MAP[qr->query_state()]; + pDrillClientQueryResult->setQueryStatus(ret); + switch(qr->query_state()) { + case exec::shared::QueryResult_QueryState_FAILED: + case exec::shared::QueryResult_QueryState_UNKNOWN_QUERY: + // get the error message from protobuf and handle errors + ret=handleQryError(ret, qr->error(0), pDrillClientQueryResult); + delete allocatedBuffer; + delete qr; + break; + + case exec::shared::QueryResult_QueryState_PENDING: + case exec::shared::QueryResult_QueryState_RUNNING: + // Ignore these state messages since they means the query is not completed. + // I have not observed those messages in testing though. + break; + // m_pendingRequests should be decremented when the query is + // canncelled or completed + case exec::shared::QueryResult_QueryState_CANCELED: + ret=handleTerminatedQryState(ret, + getMessage(ERR_QRY_CANCELED), + pDrillClientQueryResult); + case exec::shared::QueryResult_QueryState_COMPLETED: + ret=handleTerminatedQryState(ret, + getMessage(ERR_QRY_COMPLETED), + pDrillClientQueryResult); + // in both case, fall back to free mememory + delete allocatedBuffer; + delete qr; + break; + + default: + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Unknown Query State.\n"; + ret=handleQryError(QRY_INTERNAL_ERROR, + getMessage(ERR_QRY_UNKQRYSTATE), + pDrillClientQueryResult); + delete allocatedBuffer; + delete qr; + break; + } return ret; } + //Validate the RPC message std::string valErr; if( (ret=validateMessage(msg, *qr, valErr)) != QRY_SUCCESS){ delete allocatedBuffer; delete qr; + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: ERR_QRY_INVRPC.\n"; + pDrillClientQueryResult->setQueryStatus(ret); return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult); } @@ -518,22 +583,17 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer } pDrillClientQueryResult->m_bIsQueryPending=false; DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl; + pDrillClientQueryResult->setQueryStatus(ret); return ret; } if(pDrillClientQueryResult->m_bIsLastChunk){ - { - boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - m_pendingRequests--; - DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId) + DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << "Received last batch. " << std::endl; - DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId) - << "Pending requests: " << m_pendingRequests <<"." << std::endl; - } ret=QRY_NO_MORE_DATA; - sendAck(msg, true); + pDrillClientQueryResult->setQueryStatus(ret); return ret; } - sendAck(msg, true); + pDrillClientQueryResult->setQueryStatus(ret); return ret; } @@ -590,6 +650,8 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, size_t bytes_transferred) { boost::system::error_code error=err; // cancel the timer + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer " + << reinterpret_cast<int*>(_buf) << std::endl; m_deadlineTimer.cancel(); DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl; if(!error){ @@ -607,7 +669,8 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, } if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){ - if(processQueryResult(allocatedBuffer, msg)!=QRY_SUCCESS){ + status_t s = processQueryResult(allocatedBuffer, msg); + if(s !=QRY_SUCCESS && s!= QRY_NO_MORE_DATA){ if(m_pendingRequests!=0){ boost::lock_guard<boost::mutex> lock(this->m_dcMutex); getNextResult(); @@ -646,6 +709,8 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, // boost error Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); boost::lock_guard<boost::mutex> lock(this->m_dcMutex); + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. " + "Boost Communication Error: " << error.message() << std::endl; handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL); return; } @@ -716,6 +781,19 @@ void DrillClientImpl::broadcastError(DrillClientError* pErr){ } return; } +// The implementation is similar to handleQryError +status_t DrillClientImpl::handleTerminatedQryState( + status_t status, + std::string msg, + DrillClientQueryResult* pQueryResult){ + assert(pQueryResult!=NULL); + DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg); + if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} + m_pError=pErr; + m_pendingRequests--; + pQueryResult->signalError(pErr); + return status; +} void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){ std::map<int, DrillClientQueryResult*>::iterator iter; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ae2790ed/contrib/native/client/src/clientlib/drillClientImpl.hpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp index d690aad..8e2f437 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.hpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp @@ -123,6 +123,8 @@ class DrillClientQueryResult{ bool hasError(){ return m_bHasError;} status_t getErrorStatus(){ return m_pError!=NULL?(status_t)m_pError->status:QRY_SUCCESS;} const DrillClientError* getError(){ return m_pError;} + void setQueryStatus(status_t s){ m_status = s;} + status_t getQueryStatus(){ return m_status;} private: status_t setupColumnDefs(exec::shared::QueryResult* pQueryResult); @@ -163,6 +165,7 @@ class DrillClientQueryResult{ const DrillClientError* m_pError; exec::shared::QueryId* m_pQueryId; + status_t m_status; // Schema change listener pfnSchemaListener m_pSchemaListener; @@ -250,9 +253,9 @@ class DrillClientImpl{ // Query results void getNextResult(); status_t readMsg( - ByteBuf_t _buf, - AllocatedBufferPtr* allocatedBuffer, - InBoundRpcMessage& msg, + ByteBuf_t _buf, + AllocatedBufferPtr* allocatedBuffer, + InBoundRpcMessage& msg, boost::system::error_code& error); status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg); status_t processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ); @@ -264,6 +267,11 @@ class DrillClientImpl{ status_t handleQryError(status_t status, const exec::shared::DrillPBError& e, DrillClientQueryResult* pQueryResult); + // handle query state indicating query is COMPELTED or CANCELED + // (i.e., COMPELTED or CANCELE) + status_t handleTerminatedQryState(status_t status, + std::string msg, + DrillClientQueryResult* pQueryResult); void broadcastError(DrillClientError* pErr); void clearMapEntries(DrillClientQueryResult* pQueryResult); void sendAck(InBoundRpcMessage& msg, bool isOk); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ae2790ed/contrib/native/client/src/clientlib/errmsgs.cpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/errmsgs.cpp b/contrib/native/client/src/clientlib/errmsgs.cpp index 966cfc2..7a7fa6a 100644 --- a/contrib/native/client/src/clientlib/errmsgs.cpp +++ b/contrib/native/client/src/clientlib/errmsgs.cpp @@ -51,15 +51,15 @@ static Drill::ErrorMessages errorMessages[]={ {ERR_QRY_FAILURE, ERR_CATEGORY_QRY, 0, "Query execution error. Details:[ \n%s\n]"}, {ERR_QRY_SELVEC2, ERR_CATEGORY_QRY, 0, "Receiving a selection_vector_2 from the server came as a complete surprise at this point"}, {ERR_QRY_RESPFAIL, ERR_CATEGORY_QRY, 0, "Got a RESPONSE_FAILURE from the server and don't know what to do"}, - {ERR_QRY_12, ERR_CATEGORY_QRY, 0, "Query Failed."}, + {ERR_QRY_UNKQRYSTATE, ERR_CATEGORY_QRY, 0, "Got an unknown query state message from the server."}, {ERR_QRY_UNKQRY, ERR_CATEGORY_QRY, 0, "The server didn't find this query"}, {ERR_QRY_CANCELED, ERR_CATEGORY_QRY, 0, "The server says this query has been cancelled"}, - {ERR_QRY_15, ERR_CATEGORY_QRY, 0, "Query Failed."}, + {ERR_QRY_COMPLETED, ERR_CATEGORY_QRY, 0, "Received query_state: COMPLETED."}, {ERR_QRY_16, ERR_CATEGORY_QRY, 0, "Query Failed."}, {ERR_QRY_17, ERR_CATEGORY_QRY, 0, "Query Failed."}, {ERR_QRY_18, ERR_CATEGORY_QRY, 0, "Query Failed."}, {ERR_QRY_19, ERR_CATEGORY_QRY, 0, "Query Failed."}, - {ERR_QRY_20, ERR_CATEGORY_QRY, 0, "Query Failed."} + {ERR_QRY_20, ERR_CATEGORY_QRY, 0, "Query Failed."}, }; std::string getMessage(uint32_t msgId, ...){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ae2790ed/contrib/native/client/src/clientlib/errmsgs.hpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/errmsgs.hpp b/contrib/native/client/src/clientlib/errmsgs.hpp index 437335c..9a69f21 100644 --- a/contrib/native/client/src/clientlib/errmsgs.hpp +++ b/contrib/native/client/src/clientlib/errmsgs.hpp @@ -58,10 +58,10 @@ namespace Drill{ #define ERR_QRY_FAILURE ERR_CONN_MAX+9 #define ERR_QRY_SELVEC2 ERR_CONN_MAX+10 #define ERR_QRY_RESPFAIL ERR_CONN_MAX+11 -#define ERR_QRY_12 ERR_CONN_MAX+12 +#define ERR_QRY_UNKQRYSTATE ERR_CONN_MAX+12 #define ERR_QRY_UNKQRY ERR_CONN_MAX+13 #define ERR_QRY_CANCELED ERR_CONN_MAX+14 -#define ERR_QRY_15 ERR_CONN_MAX+15 +#define ERR_QRY_COMPLETED ERR_CONN_MAX+15 #define ERR_QRY_16 ERR_CONN_MAX+16 #define ERR_QRY_17 ERR_CONN_MAX+17 #define ERR_QRY_18 ERR_CONN_MAX+18 http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ae2790ed/contrib/native/client/src/include/drill/common.hpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp index 151d698..c49240e 100644 --- a/contrib/native/client/src/include/drill/common.hpp +++ b/contrib/native/client/src/include/drill/common.hpp @@ -42,7 +42,7 @@ #define MAX_SOCK_RD_BUFSIZE 1024 #define MEM_CHUNK_SIZE 64*1024; // 64K -#define MAX_MEM_ALLOC_SIZE 256*1024*1024; // 256 MB +#define MAX_MEM_ALLOC_SIZE 256*1024*1024; // 256 MB #ifdef _DEBUG #define EXTRA_DEBUGGING @@ -71,7 +71,13 @@ typedef enum{ QRY_OUT_OF_BOUNDS=5, QRY_CLIENT_OUTOFMEM=6, QRY_INTERNAL_ERROR=7, - QRY_COMM_ERROR=8 + QRY_COMM_ERROR=8, + QRY_PENDING = 9, + QRY_RUNNING = 10, + QRY_COMPLETED = 11, + QRY_CANCELED = 12, + QRY_FAILED = 13, + QRY_UNKNOWN_QUERY = 14 } status_t; typedef enum{