DRILL-1996: Add cancel method to Drill C++ connector

This closes #602


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/83513daf
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/83513daf
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/83513daf

Branch: refs/heads/master
Commit: 83513daf0903e0d94fcaad7b1ae4e8ad6272b494
Parents: 166c4ce
Author: Laurent Goujon <[email protected]>
Authored: Tue Oct 11 16:35:18 2016 -0700
Committer: Parth Chandra <[email protected]>
Committed: Tue Nov 1 11:33:23 2016 -0700

----------------------------------------------------------------------
 .../native/client/example/querySubmitter.cpp    |   9 +-
 .../native/client/src/clientlib/drillClient.cpp |   8 +
 .../client/src/clientlib/drillClientImpl.cpp    | 174 +++++++++++--------
 .../client/src/clientlib/drillClientImpl.hpp    |   1 +
 .../client/src/include/drill/drillClient.hpp    |   7 +
 5 files changed, 124 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/83513daf/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp 
b/contrib/native/client/example/querySubmitter.cpp
index 306db56..2eeaf35 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -415,7 +415,14 @@ int main(int argc, char* argv[]) {
                     client.submitQuery(type, *queryInpIter, 
QueryResultsListener, NULL, &qryHandle);
                     client.registerSchemaChangeListener(&qryHandle, 
SchemaListener);
                     
-                    client.waitForResults();
+                     if(bTestCancel) {
+                        // Send cancellation request after 5seconds
+                        
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
+                        std::cout<< "\n Cancelling query: " << *queryInpIter 
<< "\n" << std::endl;
+                        client.cancelQuery(qryHandle);
+                    } else {
+                        client.waitForResults();
+                    }
 
                     client.freeQueryResources(&qryHandle);
                 }

http://git-wip-us.apache.org/repos/asf/drill/blob/83513daf/contrib/native/client/src/clientlib/drillClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp 
b/contrib/native/client/src/clientlib/drillClient.cpp
index 20a466e..b02f993 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -400,6 +400,14 @@ status_t DrillClient::executeQuery(const 
PreparedStatement& pstmt, pfnQueryResul
        return QRY_SUCCESS;
 }
 
+void DrillClient::cancelQuery(QueryHandle_t handle) {
+       if (!handle) {
+               return;
+       }
+       DrillClientQueryHandle* pHandle = 
static_cast<DrillClientQueryHandle*>(handle);
+       pHandle->cancel();
+}
+
 void* DrillClient::getApplicationContext(QueryHandle_t handle){
     assert(handle!=NULL);
     return 
(static_cast<DrillClientQueryHandle*>(handle))->getApplicationContext();

http://git-wip-us.apache.org/repos/asf/drill/blob/83513daf/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp 
b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 7ecf910..51ae1a2 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -825,7 +825,6 @@ status_t 
DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
 status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, 
const rpc::InBoundRpcMessage& msg ){
     DrillClientQueryResult* pDrillClientQueryResult=NULL;
     status_t ret=QRY_SUCCESS;
-    ::exec::shared::QueryId qid;
     // Be a good client and send ack as early as possible.
     // Drillbit pushed the query result to the client, the client should send 
ack
     // whenever it receives the message
@@ -839,7 +838,7 @@ status_t 
DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, c
         qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;)
 
-        qid = ::exec::shared::QueryId(qr->query_id());
+        const ::exec::shared::QueryId& qid = qr->query_id();
         if(qid.part1()==0){
             DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << 
std::endl;)
             delete allocatedBuffer;
@@ -855,90 +854,105 @@ status_t 
DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, c
             return ret;
         }
 
-        //Validate the RPC message
-        std::string valErr;
-        if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){
-            delete allocatedBuffer;
-            delete qr;
-            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";)
-            pDrillClientQueryResult->setQueryStatus(ret);
-            return handleQryError(ret, getMessage(ERR_QRY_INVRPC, 
valErr.c_str()), pDrillClientQueryResult);
-        }
-
-        //Build Record Batch here
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for Query 
Id - " << debugPrintQid(qr->query_id()) << std::endl;)
-
-        pRecordBatch= new RecordBatch(qr, allocatedBuffer,  msg.m_dbody);
-        pDrillClientQueryResult->m_numBatches++;
-
-        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << 
(void*)pRecordBatch << std::endl;)
-        pRecordBatch->build();
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << 
debugPrintQid(qr->query_id())<<"recordBatch.numRecords "
-            << pRecordBatch->getNumRecords()  << std::endl;)
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << 
debugPrintQid(qr->query_id())<<"recordBatch.numFields "
-            << pRecordBatch->getNumFields()  << std::endl;)
-
-        ret=pDrillClientQueryResult->setupColumnDefs(qr);
-        if(ret==QRY_SUCCESS_WITH_INFO){
-            pRecordBatch->schemaChanged(true);
-        }
-
-        pDrillClientQueryResult->setIsQueryPending(true);
-        if(pDrillClientQueryResult->m_bIsLastChunk){
-            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << 
debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
-                <<  "Received last batch. " << std::endl;)
-            ret=QRY_NO_MORE_DATA;
+        // check if query has been cancelled
+        if (pDrillClientQueryResult->isCancelled()) {
+            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query 
cancellation " << std::endl;)
+               delete qr;
+               delete allocatedBuffer;
+               ret =  QRY_CANCEL;
+        } else {
+               //Validate the RPC message
+               std::string valErr;
+               if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){
+                       delete allocatedBuffer;
+                       delete qr;
+                       DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";)
+                       pDrillClientQueryResult->setQueryStatus(ret);
+                       return handleQryError(ret, getMessage(ERR_QRY_INVRPC, 
valErr.c_str()), pDrillClientQueryResult);
+               }
+
+               //Build Record Batch here
+               DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for 
Query Id - " << debugPrintQid(qid) << std::endl;)
+
+               pRecordBatch= new RecordBatch(qr, allocatedBuffer,  
msg.m_dbody);
+               pDrillClientQueryResult->m_numBatches++;
+
+               DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record 
batch." << (void*)pRecordBatch << std::endl;)
+               pRecordBatch->build();
+               DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << 
debugPrintQid(qid)<<"recordBatch.numRecords "
+                               << pRecordBatch->getNumRecords()  << std::endl;)
+               DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << 
debugPrintQid(qid)<<"recordBatch.numFields "
+                               << pRecordBatch->getNumFields()  << std::endl;)
+
+                                       
ret=pDrillClientQueryResult->setupColumnDefs(qr);
+               if(ret==QRY_SUCCESS_WITH_INFO){
+                       pRecordBatch->schemaChanged(true);
+               }
+
+               pDrillClientQueryResult->setIsQueryPending(true);
+               if(pDrillClientQueryResult->m_bIsLastChunk){
+                       DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid)
+                                       <<  "Received last batch. " << 
std::endl;)
+                       ret=QRY_NO_MORE_DATA;
+               }
+               pDrillClientQueryResult->setQueryStatus(ret);
+               ret = pDrillClientQueryResult->notifyListener(pRecordBatch, 
NULL);
         }
-        pDrillClientQueryResult->setQueryStatus(ret);
-        ret = pDrillClientQueryResult->notifyListener(pRecordBatch, NULL);
     } // release lock
-    if(ret==QRY_FAILURE){
-        sendCancel(&qid);
-        // Do not decrement pending requests here. We have sent a cancel and 
we may still receive results that are
-        // pushed on the wire before the cancel is processed.
-        pDrillClientQueryResult->setIsQueryPending(false);
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << 
std::endl;)
-        pDrillClientQueryResult->setQueryStatus(ret);
-        removeQueryHandle(pDrillClientQueryResult);
-        removeQueryResult(pDrillClientQueryResult);
-        return ret;
+    if((ret==QRY_FAILURE || ret==QRY_CANCELED) && pDrillClientQueryResult != 
NULL){
+        return handleQryCancellation(ret, pDrillClientQueryResult);
     }
     return ret;
 }
 
 status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, 
const rpc::InBoundRpcMessage& msg ){
     DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with 
coordination id:" << msg.m_coord_id << std::endl;)
+       DrillClientQueryResult* pDrillClientQueryResult=NULL;
     status_t ret=QRY_SUCCESS;
 
     // make sure to deallocate buffer
     boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
-    boost::lock_guard<boost::mutex> lock(m_dcMutex);
-    for(std::map< ::exec::shared::QueryId*, 
DrillClientQueryResult*>::const_iterator 
it=this->m_queryResults.begin();it!=this->m_queryResults.end();it++){
-        DrillClientQueryResult* pDrillClientQueryResult=it->second;
-        std::string qidString = 
(pDrillClientQueryResult->m_pQueryId!=NULL)?debugPrintQid(*pDrillClientQueryResult->m_pQueryId):std::string("NULL");
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: 
m_queryIds: coordinationId: " << pDrillClientQueryResult->m_coordinationId
-        << " QueryId: "<< qidString << std::endl;)
-    }
-    if(msg.m_coord_id==0){
-        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: 
m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
-        return QRY_SUCCESS;
-    }
-    std::map<int, DrillClientQueryHandle*>::const_iterator it;
-    it=this->m_queryHandles.find(msg.m_coord_id);
-    if(it!=this->m_queryHandles.end()){
-        DrillClientQueryResult* 
pDrillClientQueryResult=dynamic_cast<DrillClientQueryResult*>((*it).second);
-        if (!pDrillClientQueryResult) {
-            return handleQryError(QRY_INTERNAL_ERROR, 
getMessage(ERR_QRY_INVQUERYID), NULL);
-        }
-        exec::shared::QueryId *qid = new exec::shared::QueryId;
-        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received Query Handle " << 
msg.m_pbody.size() << std::endl;)
-        qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << 
debugPrintQid(*qid) << std::endl;)
-        m_queryResults[qid]=pDrillClientQueryResult;
-        //save queryId allocated here so we can free it later
-        pDrillClientQueryResult->setQueryId(qid);
-    }else{
-        return handleQryError(QRY_INTERNAL_ERROR, 
getMessage(ERR_QRY_INVQUERYID), NULL);
+    {
+       boost::lock_guard<boost::mutex> lock(m_dcMutex);
+
+       if(msg.m_coord_id==0){
+               DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." 
<< std::endl;)
+               return QRY_SUCCESS;
+       }
+
+       for(std::map< ::exec::shared::QueryId*, 
DrillClientQueryResult*>::const_iterator 
it=this->m_queryResults.begin();it!=this->m_queryResults.end();it++){
+               DrillClientQueryResult* pQueryResult=it->second;
+               std::string qidString = 
(pQueryResult->m_pQueryId!=NULL)?debugPrintQid(*pQueryResult->m_pQueryId):std::string("NULL");
+               DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << 
"DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << 
pQueryResult->m_coordinationId
+                               << " QueryId: "<< qidString << std::endl;)
+       }
+
+       std::map<int, DrillClientQueryHandle*>::const_iterator it;
+       it=this->m_queryHandles.find(msg.m_coord_id);
+       if(it==this->m_queryHandles.end()){
+               return handleQryError(QRY_INTERNAL_ERROR, 
getMessage(ERR_QRY_INVQUERYID), NULL);
+       }
+       
pDrillClientQueryResult=dynamic_cast<DrillClientQueryResult*>((*it).second);
+       if (!pDrillClientQueryResult) {
+               return handleQryError(QRY_INTERNAL_ERROR, 
getMessage(ERR_QRY_INVQUERYID), NULL);
+       }
+
+       // Check for cancellation to notify
+       if (pDrillClientQueryResult->isCancelled()) {
+               ret = QRY_CANCELED;
+       }
+       else {
+               exec::shared::QueryId *qid = new exec::shared::QueryId;
+               DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received Query Handle " 
<< msg.m_pbody.size() << std::endl;)
+               qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
+               DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << 
debugPrintQid(*qid) << std::endl;)
+               m_queryResults[qid]=pDrillClientQueryResult;
+               //save queryId allocated here so we can free it later
+               pDrillClientQueryResult->setQueryId(qid);
+       }
+    }
+    if (ret == QRY_CANCELED && pDrillClientQueryResult != NULL) {
+       return handleQryCancellation(ret, pDrillClientQueryResult);
     }
     return ret;
 }
@@ -1486,6 +1500,18 @@ status_t DrillClientImpl::handleQryError(status_t status,
     return status;
 }
 
+status_t DrillClientImpl::handleQryCancellation(status_t status, 
DrillClientQueryResult* pQueryHandle) {
+       sendCancel(&pQueryHandle->getQueryId());
+       // Do not decrement pending requests here. We have sent a cancel and we 
may still receive results that are
+       // pushed on the wire before the cancel is processed.
+       pQueryHandle->setIsQueryPending(false);
+       DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << 
std::endl;)
+       pQueryHandle->setQueryStatus(status);
+       removeQueryResult(pQueryHandle);
+       removeQueryHandle(pQueryHandle);
+       return status;
+}
+
 void DrillClientImpl::broadcastError(DrillClientError* pErr){
     if(pErr!=NULL){
         std::map<int, DrillClientQueryHandle*>::const_iterator iter;

http://git-wip-us.apache.org/repos/asf/drill/blob/83513daf/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp 
b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index f9d0779..8da37b6 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -489,6 +489,7 @@ class DrillClientImpl : public DrillClientImplBase{
         status_t validateDataMessage(const rpc::InBoundRpcMessage& msg, const 
exec::shared::QueryData& qd, std::string& valError);
         status_t validateResultMessage(const rpc::InBoundRpcMessage& msg, 
const exec::shared::QueryResult& qr, std::string& valError);
         connectionStatus_t handleConnError(connectionStatus_t status, const 
std::string& msg);
+        status_t handleQryCancellation(status_t status, 
DrillClientQueryResult* pQueryResult);
         status_t handleQryError(status_t status, const std::string& msg, 
DrillClientQueryHandle* pQueryHandle);
         status_t handleQryError(status_t status, const 
exec::shared::DrillPBError& e, DrillClientQueryHandle* pQueryHandle);
         // handle query state indicating query is COMPLETED or CANCELED

http://git-wip-us.apache.org/repos/asf/drill/blob/83513daf/contrib/native/client/src/include/drill/drillClient.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/drillClient.hpp 
b/contrib/native/client/src/include/drill/drillClient.hpp
index 5e59885..29ae6c2 100644
--- a/contrib/native/client/src/include/drill/drillClient.hpp
+++ b/contrib/native/client/src/include/drill/drillClient.hpp
@@ -1276,6 +1276,13 @@ class DECLSPEC_DRILL_CLIENT DrillClient{
         status_t executeQuery(const PreparedStatement& pstmt, 
pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle);
 
         /*
+         * Cancel a query.
+         *
+         * @param[in] the handle of the query to cancel
+         */
+        void cancelQuery(QueryHandle_t handle);
+
+        /*
          * The client application should call this function to wait for 
results if it has registered a
          * listener.
          */

Reply via email to