DRILL-1444: Fix cancel in C++ client library
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f1fd9a9a Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f1fd9a9a Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f1fd9a9a Branch: refs/heads/master Commit: f1fd9a9ab61cb9f4ce53be68f82ad7906b58cea4 Parents: ebf4785 Author: Parth Chandra <pchan...@maprtech.com> Authored: Mon Sep 29 17:27:48 2014 -0700 Committer: Steven Phillips <sphill...@maprtech.com> Committed: Mon Sep 29 18:21:45 2014 -0700 ---------------------------------------------------------------------- .../native/client/example/querySubmitter.cpp | 55 +++++++++++++------- .../client/src/clientlib/drillClientImpl.cpp | 21 ++++---- .../client/src/clientlib/drillClientImpl.hpp | 4 +- 3 files changed, 47 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1fd9a9a/contrib/native/client/example/querySubmitter.cpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp index 17bec3c..9feb61d 100644 --- a/contrib/native/client/example/querySubmitter.cpp +++ b/contrib/native/client/example/querySubmitter.cpp @@ -22,6 +22,28 @@ #include <stdlib.h> #include "drill/drillc.hpp" +int nOptions=8; + +struct Option{ + char name[32]; + char desc[128]; + bool required; +}qsOptions[]= { + {"plan", "Plan files separated by semicolons", false}, + {"query", "Query strings, separated by semicolons", false}, + {"type", "Query type [physical|logical|sql]", true}, + {"connectStr", "Connect string", true}, + {"schema", "Default schema", false}, + {"api", "API type [sync|async]", true}, + {"logLevel", "Logging level [trace|debug|info|warn|error|fatal]", false}, + {"testCancel", "Cancel the query afterthe first record batch.", false} +}; + +std::map<std::string, std::string> qsOptionValues; + +bool bTestCancel=false; + + Drill::status_t SchemaListener(void* ctx, Drill::FieldDefPtr fields, Drill::DrillClientError* err){ if(!err){ printf("SCHEMA CHANGE DETECTED:\n"); @@ -41,7 +63,11 @@ Drill::status_t QueryResultsListener(void* ctx, Drill::RecordBatch* b, Drill::Dr if(!err){ b->print(std::cout, 0); // print all rows delete b; // we're done with this batch, we can delete it - return Drill::QRY_SUCCESS ; + if(bTestCancel){ + return Drill::QRY_FAILURE; + }else{ + return Drill::QRY_SUCCESS ; + } }else{ std::cerr<< "ERROR: " << err->msg << std::endl; return Drill::QRY_FAILURE; @@ -93,24 +119,6 @@ void print(const Drill::FieldMetadata* pFieldMetadata, void* buf, size_t sz){ return; } -int nOptions=6; - -struct Option{ - char name[32]; - char desc[128]; - bool required; -}qsOptions[]= { - {"plan", "Plan files separated by semicolons", false}, - {"query", "Query strings, separated by semicolons", false}, - {"type", "Query type [physical|logical|sql]", true}, - {"connectStr", "Connect string", true}, - {"schema", "Default schema", false}, - {"api", "API type [sync|async]", true}, - {"logLevel", "Logging level [trace|debug|info|warn|error|fatal]", false} -}; - -std::map<std::string, std::string> qsOptionValues; - void printUsage(){ std::cerr<<"Usage: querySubmitter "; for(int j=0; j<nOptions ;j++){ @@ -243,6 +251,7 @@ int main(int argc, char* argv[]) { std::string api=qsOptionValues["api"]; std::string type_str=qsOptionValues["type"]; std::string logLevel=qsOptionValues["logLevel"]; + std::string testCancel=qsOptionValues["testCancel"]; exec::shared::QueryType type; @@ -267,6 +276,8 @@ int main(int argc, char* argv[]) { type=exec::shared::SQL; } + bTestCancel = !strcmp(testCancel.c_str(), "true")?true:false; + std::vector<std::string>::iterator queryInpIter; std::vector<Drill::RecordIterator*> recordIterators; @@ -320,8 +331,12 @@ int main(int argc, char* argv[]) { print(fields->at(i), pBuf, sz); } printf("\n"); + if(bTestCancel && row%100==1){ + pRecIter->cancel(); + printf("Application canceled the query.\n"); + } } - if(ret!=Drill::QRY_NO_MORE_DATA){ + if(ret!=Drill::QRY_NO_MORE_DATA && ret!=Drill::QRY_CANCEL){ std::cerr<< pRecIter->getError() << std::endl; } client.freeQueryIterator(&pRecIter); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1fd9a9a/contrib/native/client/src/clientlib/drillClientImpl.cpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp index f1a165d..77795ed 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.cpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp @@ -440,6 +440,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){ DrillClientQueryResult* pDrillClientQueryResult=NULL; status_t ret=QRY_SUCCESS; + exec::shared::QueryId qid; { boost::lock_guard<boost::mutex> lock(this->m_dcMutex); exec::shared::QueryResult* qr = new exec::shared::QueryResult; //Record Batch will own this object and free it up. @@ -450,7 +451,6 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qr->query_id()) << std::endl; - exec::shared::QueryId qid; qid.CopyFrom(qr->query_id()); std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it; it=this->m_queryResults.find(&qid); @@ -511,7 +511,7 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer } } // release lock if(ret==QRY_FAILURE){ - sendCancel(msg); + sendCancel(&qid); { boost::lock_guard<boost::mutex> lock(this->m_dcMutex); m_pendingRequests--; @@ -530,10 +530,10 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer << "Pending requests: " << m_pendingRequests <<"." << std::endl; } ret=QRY_NO_MORE_DATA; - sendAck(msg); + sendAck(msg, true); return ret; } - sendAck(msg); + sendAck(msg, true); return ret; } @@ -739,21 +739,20 @@ void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){ } } -void DrillClientImpl::sendAck(InBoundRpcMessage& msg){ +void DrillClientImpl::sendAck(InBoundRpcMessage& msg, bool isOk){ exec::rpc::Ack ack; - ack.set_ok(true); + ack.set_ok(isOk); OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack); boost::lock_guard<boost::mutex> lock(m_dcMutex); sendSync(ack_msg); DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl; } -void DrillClientImpl::sendCancel(InBoundRpcMessage& msg){ - exec::rpc::Ack ack; - ack.set_ok(true); - OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::CANCEL_QUERY, msg.m_coord_id, &ack); +void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){ boost::lock_guard<boost::mutex> lock(m_dcMutex); - sendSync(ack_msg); + uint64_t coordId = this->getNextCoordinationId(); + OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId); + sendSync(cancel_msg); DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1fd9a9a/contrib/native/client/src/clientlib/drillClientImpl.hpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp index 5ac158f..d690aad 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.hpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp @@ -266,8 +266,8 @@ class DrillClientImpl{ DrillClientQueryResult* pQueryResult); void broadcastError(DrillClientError* pErr); void clearMapEntries(DrillClientQueryResult* pQueryResult); - void sendAck(InBoundRpcMessage& msg); - void sendCancel(InBoundRpcMessage& msg); + void sendAck(InBoundRpcMessage& msg, bool isOk); + void sendCancel(exec::shared::QueryId* pQueryId); static RpcEncoder s_encoder;