DRILL-1444: Fix cancel in C++ client library

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f1fd9a9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f1fd9a9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f1fd9a9a

Branch: refs/heads/master
Commit: f1fd9a9ab61cb9f4ce53be68f82ad7906b58cea4
Parents: ebf4785
Author: Parth Chandra <pchan...@maprtech.com>
Authored: Mon Sep 29 17:27:48 2014 -0700
Committer: Steven Phillips <sphill...@maprtech.com>
Committed: Mon Sep 29 18:21:45 2014 -0700

----------------------------------------------------------------------
 .../native/client/example/querySubmitter.cpp    | 55 +++++++++++++-------
 .../client/src/clientlib/drillClientImpl.cpp    | 21 ++++----
 .../client/src/clientlib/drillClientImpl.hpp    |  4 +-
 3 files changed, 47 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1fd9a9a/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp 
b/contrib/native/client/example/querySubmitter.cpp
index 17bec3c..9feb61d 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -22,6 +22,28 @@
 #include <stdlib.h>
 #include "drill/drillc.hpp"
 
+int nOptions=8;
+
+struct Option{
+    char name[32];
+    char desc[128];
+    bool required;
+}qsOptions[]= {
+    {"plan", "Plan files separated by semicolons", false},
+    {"query", "Query strings, separated by semicolons", false},
+    {"type", "Query type [physical|logical|sql]", true},
+    {"connectStr", "Connect string", true},
+    {"schema", "Default schema", false},
+    {"api", "API type [sync|async]", true},
+    {"logLevel", "Logging level [trace|debug|info|warn|error|fatal]", false},
+    {"testCancel", "Cancel the query afterthe first record batch.", false}
+};
+
+std::map<std::string, std::string> qsOptionValues;
+
+bool bTestCancel=false;
+
+
 Drill::status_t SchemaListener(void* ctx, Drill::FieldDefPtr fields, 
Drill::DrillClientError* err){
     if(!err){
         printf("SCHEMA CHANGE DETECTED:\n");
@@ -41,7 +63,11 @@ Drill::status_t QueryResultsListener(void* ctx, 
Drill::RecordBatch* b, Drill::Dr
     if(!err){
         b->print(std::cout, 0); // print all rows
         delete b; // we're done with this batch, we can delete it
-        return Drill::QRY_SUCCESS ;
+        if(bTestCancel){
+            return Drill::QRY_FAILURE;
+        }else{
+            return Drill::QRY_SUCCESS ;
+        }
     }else{
         std::cerr<< "ERROR: " << err->msg << std::endl;
         return Drill::QRY_FAILURE;
@@ -93,24 +119,6 @@ void print(const Drill::FieldMetadata* pFieldMetadata, 
void* buf, size_t sz){
     return;
 }
 
-int nOptions=6;
-
-struct Option{
-    char name[32];
-    char desc[128];
-    bool required;
-}qsOptions[]= {
-    {"plan", "Plan files separated by semicolons", false},
-    {"query", "Query strings, separated by semicolons", false},
-    {"type", "Query type [physical|logical|sql]", true},
-    {"connectStr", "Connect string", true},
-    {"schema", "Default schema", false},
-    {"api", "API type [sync|async]", true},
-    {"logLevel", "Logging level [trace|debug|info|warn|error|fatal]", false}
-};
-
-std::map<std::string, std::string> qsOptionValues;
-
 void printUsage(){
     std::cerr<<"Usage: querySubmitter ";
     for(int j=0; j<nOptions ;j++){
@@ -243,6 +251,7 @@ int main(int argc, char* argv[]) {
         std::string api=qsOptionValues["api"];
         std::string type_str=qsOptionValues["type"];
         std::string logLevel=qsOptionValues["logLevel"];
+        std::string testCancel=qsOptionValues["testCancel"];
 
         exec::shared::QueryType type;
 
@@ -267,6 +276,8 @@ int main(int argc, char* argv[]) {
             type=exec::shared::SQL;
         }
 
+        bTestCancel = !strcmp(testCancel.c_str(), "true")?true:false;
+
         std::vector<std::string>::iterator queryInpIter;
 
         std::vector<Drill::RecordIterator*> recordIterators;
@@ -320,8 +331,12 @@ int main(int argc, char* argv[]) {
                         print(fields->at(i), pBuf, sz);
                     }
                     printf("\n");
+                    if(bTestCancel && row%100==1){
+                        pRecIter->cancel();
+                        printf("Application canceled the query.\n");
+                    }
                 }
-                if(ret!=Drill::QRY_NO_MORE_DATA){
+                if(ret!=Drill::QRY_NO_MORE_DATA && ret!=Drill::QRY_CANCEL){
                     std::cerr<< pRecIter->getError() << std::endl;
                 }
                 client.freeQueryIterator(&pRecIter);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1fd9a9a/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp 
b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index f1a165d..77795ed 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -440,6 +440,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
 status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  
allocatedBuffer, InBoundRpcMessage& msg ){
     DrillClientQueryResult* pDrillClientQueryResult=NULL;
     status_t ret=QRY_SUCCESS;
+    exec::shared::QueryId qid;
     {
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
         exec::shared::QueryResult* qr = new exec::shared::QueryResult; 
//Record Batch will own this object and free it up.
@@ -450,7 +451,6 @@ status_t 
DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
 
         DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << 
debugPrintQid(qr->query_id()) << std::endl;
 
-        exec::shared::QueryId qid;
         qid.CopyFrom(qr->query_id());
         std::map<exec::shared::QueryId*, DrillClientQueryResult*, 
compareQueryId>::iterator it;
         it=this->m_queryResults.find(&qid);
@@ -511,7 +511,7 @@ status_t 
DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
         }
     } // release lock
     if(ret==QRY_FAILURE){
-        sendCancel(msg);
+        sendCancel(&qid);
         {
             boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
             m_pendingRequests--;
@@ -530,10 +530,10 @@ status_t 
DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
                 << "Pending requests: " << m_pendingRequests <<"." << 
std::endl;
         }
         ret=QRY_NO_MORE_DATA;
-        sendAck(msg);
+        sendAck(msg, true);
         return ret;
     }
-    sendAck(msg);
+    sendAck(msg, true);
     return ret;
 }
 
@@ -739,21 +739,20 @@ void 
DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){
     }
 }
 
-void DrillClientImpl::sendAck(InBoundRpcMessage& msg){
+void DrillClientImpl::sendAck(InBoundRpcMessage& msg, bool isOk){
     exec::rpc::Ack ack;
-    ack.set_ok(true);
+    ack.set_ok(isOk);
     OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, 
msg.m_coord_id, &ack);
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
     sendSync(ack_msg);
     DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;
 }
 
-void DrillClientImpl::sendCancel(InBoundRpcMessage& msg){
-    exec::rpc::Ack ack;
-    ack.set_ok(true);
-    OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::CANCEL_QUERY, 
msg.m_coord_id, &ack);
+void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
-    sendSync(ack_msg);
+    uint64_t coordId = this->getNextCoordinationId();
+    OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, 
exec::user::CANCEL_QUERY, coordId, pQueryId);
+    sendSync(cancel_msg);
     DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1fd9a9a/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp 
b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index 5ac158f..d690aad 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -266,8 +266,8 @@ class DrillClientImpl{
                 DrillClientQueryResult* pQueryResult);
         void broadcastError(DrillClientError* pErr);
         void clearMapEntries(DrillClientQueryResult* pQueryResult);
-        void sendAck(InBoundRpcMessage& msg);
-        void sendCancel(InBoundRpcMessage& msg);
+        void sendAck(InBoundRpcMessage& msg, bool isOk);
+        void sendCancel(exec::shared::QueryId* pQueryId);
 
 
         static RpcEncoder s_encoder;

Reply via email to