DRILL-4313: C++ Client - Thread safe Logging.  Improved Drill bit selection.
 - Update random drill bit selection. Shuffle the list initially, then round 
robin. Add Utility methods to get random numbers and to shuffle and add 
vectors. Whitespace cleanup
 - Add Git properties to build and print to log.
 - Add interface to get error based on query handle.
 - Add support for Pooled connections. Allows switching between pooled and 
unpooled connections based on environment variables


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/df0f0af3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/df0f0af3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/df0f0af3

Branch: refs/heads/master
Commit: df0f0af3d963c1b65eb01c3141fe84532c53f5a5
Parents: a2fec78
Author: Parth Chandra <[email protected]>
Authored: Fri Feb 12 15:42:53 2016 -0800
Committer: Parth Chandra <[email protected]>
Committed: Mon Mar 7 17:49:50 2016 -0800

----------------------------------------------------------------------
 contrib/native/client/CMakeLists.txt            |  24 +-
 .../client/cmakeModules/FindZookeeper.cmake     |   2 +-
 .../native/client/example/querySubmitter.cpp    |  25 +-
 .../native/client/src/clientlib/drillClient.cpp |  33 +-
 .../client/src/clientlib/drillClientImpl.cpp    | 600 ++++++++++++++-----
 .../client/src/clientlib/drillClientImpl.hpp    | 169 +++++-
 contrib/native/client/src/clientlib/env.h.in    |  26 +
 contrib/native/client/src/clientlib/errmsgs.cpp |   2 +
 contrib/native/client/src/clientlib/errmsgs.hpp |   4 +-
 contrib/native/client/src/clientlib/logger.cpp  | 126 ++--
 contrib/native/client/src/clientlib/logger.hpp  |  85 +--
 contrib/native/client/src/clientlib/utils.cpp   | 109 ++--
 contrib/native/client/src/clientlib/utils.hpp   | 100 +++-
 .../native/client/src/include/drill/common.hpp  |   9 +-
 .../client/src/include/drill/drillClient.hpp    |   7 +-
 15 files changed, 1001 insertions(+), 320 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/df0f0af3/contrib/native/client/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/contrib/native/client/CMakeLists.txt 
b/contrib/native/client/CMakeLists.txt
index 603586d..b22af42 100644
--- a/contrib/native/client/CMakeLists.txt
+++ b/contrib/native/client/CMakeLists.txt
@@ -22,8 +22,20 @@ project(drillclient)
 message("Project Dir = ${PROJECT_SOURCE_DIR}")
 message("Source Dir = ${CMAKE_SOURCE_DIR} ")
 
+cmake_policy(SET CMP0043 NEW)
+
 set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmakeModules/")
 
+# Get the latest git commit properties of the working branch
+execute_process(
+    COMMAND git log -1 --format="\\nCommit: %H \\nDescription: %s \\nAuthor: 
%aN Date: %ai"
+    WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
+    OUTPUT_VARIABLE GIT_COMMIT_PROP
+    OUTPUT_STRIP_TRAILING_WHITESPACE
+    )
+add_definitions("-DGIT_COMMIT_PROP=${GIT_COMMIT_PROP}")
+
+
 
 # Find Boost
 if(MSVC)
@@ -36,7 +48,7 @@ else()
     set(Boost_USE_STATIC_RUNTIME OFF)
 endif()
 
-find_package(Boost 1.53.0 REQUIRED COMPONENTS regex system date_time chrono 
thread )
+find_package(Boost 1.53.0 REQUIRED COMPONENTS regex system date_time chrono 
thread random)
 include_directories(${Boost_INCLUDE_DIRS})
 
 if(CMAKE_COMPILER_IS_GNUCXX)
@@ -63,6 +75,16 @@ include_directories(${PROTOBUF_INCLUDE_DIR})
 #Find Zookeeper
 find_package(Zookeeper  REQUIRED )
 
+
+# Generated sources
+configure_file(
+    ${CMAKE_SOURCE_DIR}/src/clientlib/env.h.in
+    ${CMAKE_BINARY_DIR}/generated/env.h
+    )
+
+include_directories(${CMAKE_BINARY_DIR}/generated)
+
+
 #
 #   TARGETS
 #

http://git-wip-us.apache.org/repos/asf/drill/blob/df0f0af3/contrib/native/client/cmakeModules/FindZookeeper.cmake
----------------------------------------------------------------------
diff --git a/contrib/native/client/cmakeModules/FindZookeeper.cmake 
b/contrib/native/client/cmakeModules/FindZookeeper.cmake
index fd8247f..151c05c 100644
--- a/contrib/native/client/cmakeModules/FindZookeeper.cmake
+++ b/contrib/native/client/cmakeModules/FindZookeeper.cmake
@@ -40,7 +40,7 @@ if (MSVC)
         message("- CMAKE will look for zookeeper library files in 
$ZOOKEEPER_HOME/src/c/Debug or $ZOOKEEPER_HOME/src/c/Release.")
     else()
         FILE(TO_CMAKE_PATH ${ZOOKEEPER_HOME} Zookeeper_HomePath)
-        set(Zookeeper_LIB_PATHS 
${Zookeeper_HomePath}/src/c/${ZK_BuildOutputDir})
+        set(Zookeeper_LIB_PATHS 
${Zookeeper_HomePath}/src/c/${ZK_BuildOutputDir} 
${Zookeeper_HomePath}/src/c/x64/${ZK_BuildOutputDir} )
 
         find_path(ZK_INCLUDE_DIR zookeeper.h 
${Zookeeper_HomePath}/src/c/include)
         find_path(ZK_INCLUDE_DIR_GEN zookeeper.jute.h 
${Zookeeper_HomePath}/src/c/generated)

http://git-wip-us.apache.org/repos/asf/drill/blob/df0f0af3/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp 
b/contrib/native/client/example/querySubmitter.cpp
index 960ff4f..d507d1b 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -20,6 +20,7 @@
 #include <iostream>
 #include <stdio.h>
 #include <stdlib.h>
+#include <boost/thread.hpp>
 #include "drill/drillc.hpp"
 
 int nOptions=13;
@@ -65,11 +66,13 @@ Drill::status_t SchemaListener(void* ctx, 
Drill::FieldDefPtr fields, Drill::Dril
     }
 }
 
+boost::mutex listenerMutex;
 Drill::status_t QueryResultsListener(void* ctx, Drill::RecordBatch* b, 
Drill::DrillClientError* err){
     // Invariant:
     // (received an record batch and err is NULL)
     // or
     // (received query state message passed by `err` and b is NULL)
+    boost::lock_guard<boost::mutex> listenerLock(listenerMutex);
     if(!err){
         if(b!=NULL){
             b->print(std::cout, 0); // print all rows
@@ -317,16 +320,24 @@ int main(int argc, char* argv[]) {
         std::vector<Drill::QueryHandle_t*>::iterator queryHandleIter;
 
         Drill::DrillClient client;
-        // To log to file
-        //DrillClient::initLogging("/var/log/drill/", l);
+#if defined _WIN32 || defined _WIN64
+        TCHAR tempPath[MAX_PATH];
+        GetTempPath(MAX_PATH, tempPath);
+               char logpathPrefix[MAX_PATH + 128];
+               strcpy(logpathPrefix,tempPath);
+               strcat(logpathPrefix, "\\drillclient");
+#else
+               char* logpathPrefix = "/var/log/drill/drillclient";
+#endif
+               // To log to file
+        Drill::DrillClient::initLogging(logpathPrefix, l);
         // To log to stderr
-        Drill::DrillClient::initLogging(NULL, l);
-        //Drill::DrillClientConfig::setBufferLimit(2*1024*1024); // 2MB. 
Allows us to hold at least two record batches.
-        int nQueries=queryInputs.size();
-        Drill::DrillClientConfig::setBufferLimit(nQueries*2*1024*1024); // 2MB 
per query. Allows us to hold at least two record batches.
+        //Drill::DrillClient::initLogging(NULL, l);
 
+        int nQueries=queryInputs.size();
+        Drill::DrillClientConfig::setBufferLimit(nQueries*2*1024*1024); // 2MB 
per query. The size of a record batch may vary, but is unlikely to exceed the 
256 MB which is the default. 
 
-        if (!hshakeTimeout.empty()){
+        if(!hshakeTimeout.empty()){
             
Drill::DrillClientConfig::setHandshakeTimeout(atoi(hshakeTimeout.c_str()));
         }
         if (!queryTimeout.empty()){

http://git-wip-us.apache.org/repos/asf/drill/blob/df0f0af3/contrib/native/client/src/clientlib/drillClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp 
b/contrib/native/client/src/clientlib/drillClient.cpp
index 7087938..92c5194 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-
+#include <stdlib.h>
 #include <boost/assign.hpp>
 #include "drill/common.hpp"
 #include "drill/drillClient.hpp"
@@ -56,21 +56,22 @@ int32_t DrillClientConfig::s_heartbeatFrequency=15; // 15 
seconds
 boost::mutex DrillClientConfig::s_mutex;
 
 DrillClientConfig::DrillClientConfig(){
-    initLogging(NULL);
+    // Do not initialize logging. The Logger object is static and may 
+    // not have been initialized yet
+    //initLogging(NULL);
 }
 
 DrillClientConfig::~DrillClientConfig(){
-    Logger::close();
 }
 
 void DrillClientConfig::initLogging(const char* path){
-    Logger::init(path);
+    getLogger().init(path);
 }
 
 void DrillClientConfig::setLogLevel(logLevel_t l){
     boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
     s_logLevel=l;
-    Logger::s_level=l;
+    getLogger().m_level=l;
     //boost::log::core::get()->set_filter(boost::log::trivial::severity >= 
s_logLevel);
 }
 
@@ -163,7 +164,7 @@ RecordIterator::~RecordIterator(){
     delete this->m_pQueryResult;
     this->m_pQueryResult=NULL;
     if(this->m_pCurrentRecordBatch!=NULL){
-        DRILL_LOG(LOG_TRACE) << "Deleted last Record batch " << (void*) 
m_pCurrentRecordBatch << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deleted last Record batch " << 
(void*) m_pCurrentRecordBatch << std::endl;)
         delete this->m_pCurrentRecordBatch; this->m_pCurrentRecordBatch=NULL;
     }
 }
@@ -224,7 +225,7 @@ status_t RecordIterator::next(){
         if(this->m_pCurrentRecordBatch==NULL || 
this->m_currentRecord==this->m_pCurrentRecordBatch->getNumRecords()){
             boost::lock_guard<boost::mutex> 
bufferLock(this->m_recordBatchMutex);
             if(this->m_pCurrentRecordBatch !=NULL){
-                DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) 
m_pCurrentRecordBatch << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deleted old Record batch 
" << (void*) m_pCurrentRecordBatch << std::endl;)
                 delete this->m_pCurrentRecordBatch; //free the previous record 
batch
                 this->m_pCurrentRecordBatch=NULL;
             }
@@ -235,12 +236,12 @@ status_t RecordIterator::next(){
             }
             this->m_pCurrentRecordBatch=this->m_pQueryResult->getNext();
             if(this->m_pCurrentRecordBatch != NULL){
-                DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << 
std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Fetched new Record batch 
" << std::endl;)
             }else{
-                DRILL_LOG(LOG_TRACE) << "No new Record batch found " << 
std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No new Record batch 
found " << std::endl;)
             }
             if(this->m_pCurrentRecordBatch==NULL || 
this->m_pCurrentRecordBatch->getNumRecords()==0){
-                DRILL_LOG(LOG_TRACE) << "No more data." << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more data." << 
std::endl;)
                 ret = QRY_NO_MORE_DATA;
             }else if(this->m_pCurrentRecordBatch->hasSchemaChanged()){
                 ret=QRY_SUCCESS_WITH_INFO;
@@ -315,7 +316,12 @@ void DrillClient::initLogging(const char* path, logLevel_t 
l){
 }
 
 DrillClient::DrillClient(){
-    this->m_pImpl=new DrillClientImpl;
+    const char* enablePooledClient=std::getenv(ENABLE_CONNECTION_POOL_ENV);
+    if(enablePooledClient!=NULL && atoi(enablePooledClient)!=0){
+        this->m_pImpl=new PooledDrillClientImpl;
+    }else{
+        this->m_pImpl=new DrillClientImpl;
+    }
 }
 
 DrillClient::~DrillClient(){
@@ -378,10 +384,12 @@ RecordIterator* DrillClient::submitQuery(Drill::QueryType 
t, const std::string&
 }
 
 void* DrillClient::getApplicationContext(QueryHandle_t handle){
+    assert(handle!=NULL);
     return ((DrillClientQueryResult*)handle)->getListenerContext();
 }
 
 status_t DrillClient::getQueryStatus(QueryHandle_t handle){
+    assert(handle!=NULL);
     return ((DrillClientQueryResult*)handle)->getQueryStatus();
 }
 
@@ -389,6 +397,9 @@ std::string& DrillClient::getError(){
     return m_pImpl->getError()->msg;
 }
 
+const std::string& DrillClient::getError(QueryHandle_t handle){
+    return ((DrillClientQueryResult*)handle)->getError()->msg;
+}
 
 void DrillClient::waitForResults(){
     this->m_pImpl->waitForResults();

http://git-wip-us.apache.org/repos/asf/drill/blob/df0f0af3/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp 
b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index d4e9ed9..3ec01f5 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -78,47 +78,46 @@ void setSocketTimeout(boost::asio::ip::tcp::socket& socket, 
int32_t timeout){
 #endif
 }
 
-
-void DrillClientImpl::parseConnectStr(const char* connectStr,
-        std::string& pathToDrill,
-        std::string& protocol,
-        std::string& hostPortStr){
-    char u[MAX_CONNECT_STR+1];
-    strncpy(u,connectStr, MAX_CONNECT_STR); u[MAX_CONNECT_STR]=0;
-    char* z=strtok(u, "=");
-    char* c=strtok(NULL, "/");
-    char* p=strtok(NULL, "");
-
-    if(p!=NULL) pathToDrill=std::string("/")+p;
-    protocol=z; hostPortStr=c;
-    return;
-}
-
 connectionStatus_t DrillClientImpl::connect(const char* connStr){
     std::string pathToDrill, protocol, hostPortStr;
     std::string host;
     std::string port;
     if(!this->m_bIsConnected){
-        parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
+        m_connectStr=connStr;
+        Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
         if(!strcmp(protocol.c_str(), "zk")){
             ZookeeperImpl zook;
-            if(zook.connectToZookeeper(hostPortStr.c_str(), 
pathToDrill.c_str())!=0){
+            std::vector<std::string> drillbits;
+            int err = zook.getAllDrillbits(hostPortStr.c_str(), 
pathToDrill.c_str(), drillbits);
+            if(!err){
+                Utils::shuffle(drillbits);
+                exec::DrillbitEndpoint endpoint;
+                err = zook.getEndPoint(drillbits, drillbits.size()-1, 
endpoint);// get the last one in the list
+                if(!err){
+                    host=boost::lexical_cast<std::string>(endpoint.address());
+                    
port=boost::lexical_cast<std::string>(endpoint.user_port());
+                }
+            }
+            if(err){
                 return handleConnError(CONN_ZOOKEEPER_ERROR, 
getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
             }
-            zook.debugPrint();
-            exec::DrillbitEndpoint e=zook.getEndPoint();
-            host=boost::lexical_cast<std::string>(e.address());
-            port=boost::lexical_cast<std::string>(e.user_port());
             zook.close();
+            m_bIsDirectConnection=true;  
         }else if(!strcmp(protocol.c_str(), "local")){
+            boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not 
reentrant
             char tempStr[MAX_CONNECT_STR+1];
             strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); 
tempStr[MAX_CONNECT_STR]=0;
             host=strtok(tempStr, ":");
             port=strtok(NULL, "");
+            m_bIsDirectConnection=false;  
         }else{
             return handleConnError(CONN_INVALID_INPUT, 
getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
         }
-        return this->connect(host.c_str(), port.c_str());
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << 
host << ":" << port << std::endl;)
+        connectionStatus_t ret = this->connect(host.c_str(), port.c_str());
+        return ret;
+    }else if(std::strcmp(connStr, m_connectStr.c_str())){ // tring to connect 
to a different address is not allowed if already connected
+        return handleConnError(CONN_ALREADYCONNECTED, 
getMessage(ERR_CONN_ALREADYCONN));
     }
     return CONN_SUCCESS;
 }
@@ -133,7 +132,7 @@ connectionStatus_t DrillClientImpl::connect(const char* 
host, const char* port){
         tcp::resolver::iterator end;
         while (iter != end){
             endpoint = *iter++;
-            DRILL_LOG(LOG_TRACE) << endpoint << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << endpoint << std::endl;)
         }
         boost::system::error_code ec;
         m_socket.connect(endpoint, ec);
@@ -149,6 +148,7 @@ connectionStatus_t DrillClientImpl::connect(const char* 
host, const char* port){
         return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_EXCEPT, 
e.what()));
     }
 
+    m_bIsConnected=true;
     // set socket keep alive
     boost::asio::socket_base::keep_alive keepAlive(true);
     m_socket.set_option(keepAlive);
@@ -156,35 +156,34 @@ connectionStatus_t DrillClientImpl::connect(const char* 
host, const char* port){
     boost::asio::ip::tcp::no_delay noDelay(true);
     m_socket.set_option(noDelay);
 
-    //
-    // We put some OS dependent code here for timing out a socket. Mostly, 
this appears to
-    // do nothing. Should we leave it in there?
-    //
-    setSocketTimeout(m_socket, DrillClientConfig::getSocketTimeout());
-
+    std::ostringstream connectedHost;
+    connectedHost << "id: " << m_socket.native_handle() << " address: " << 
host << ":" << port;
+    m_connectedHost = connectedHost.str();
+    DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Connected to endpoint: " << 
m_connectedHost << std::endl;)
+    
     return CONN_SUCCESS;
 }
 
 void DrillClientImpl::startHeartbeatTimer(){
-    DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with "
-        << DrillClientConfig::getHeartbeatFrequency() << " seconds." << 
std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with "
+        << DrillClientConfig::getHeartbeatFrequency() << " seconds." << 
std::endl;)
     
m_heartbeatTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHeartbeatFrequency()));
     m_heartbeatTimer.async_wait(boost::bind(
                 &DrillClientImpl::handleHeartbeatTimeout,
                 this,
                 boost::asio::placeholders::error
                 ));
-           startMessageListener(); // start this thread early so we don't have 
the timer blocked
+        startMessageListener(); // start this thread early so we don't have 
the timer blocked
 }
 
 connectionStatus_t DrillClientImpl::sendHeartbeat(){
-       connectionStatus_t status=CONN_SUCCESS;
+    connectionStatus_t status=CONN_SUCCESS;
     exec::rpc::Ack ack;
     ack.set_ok(true);
     OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be 
anything */, 0, &ack);
-       boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
-       boost::lock_guard<boost::mutex> lock(m_dcMutex);
-    DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;
+    boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
+    boost::lock_guard<boost::mutex> lock(m_dcMutex);
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;)
     status=sendSync(heartbeatMsg);
     status=status==CONN_SUCCESS?status:CONN_DEAD;
     //If the server sends responses to a heartbeat, we need to increment the 
pending requests counter.
@@ -196,21 +195,19 @@ connectionStatus_t DrillClientImpl::sendHeartbeat(){
 
 void DrillClientImpl::resetHeartbeatTimer(){
     m_heartbeatTimer.cancel();
-    DRILL_LOG(LOG_TRACE) << "Reset Heartbeat timer." << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Reset Heartbeat timer." << 
std::endl;)
     startHeartbeatTimer();
 }
 
-
-
 void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & 
err){
-    DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << 
std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer 
expired." << std::endl;)
     if(err != boost::asio::error::operation_aborted){
         // Check whether the deadline has passed.
-        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer -  Expires 
at: " 
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer 
-  Expires at: " 
             << to_simple_string(m_heartbeatTimer.expires_at())
             << " and time now is: "
             << 
to_simple_string(boost::asio::deadline_timer::traits_type::now())
-            << std::endl;
+            << std::endl;)
             ;
         if (m_heartbeatTimer.expires_at() <= 
boost::asio::deadline_timer::traits_type::now()){
             // The deadline has passed.
@@ -219,7 +216,7 @@ void DrillClientImpl::handleHeartbeatTimeout(const 
boost::system::error_code & e
                 startHeartbeatTimer();
             }else{
                 // Close connection.
-                DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat. 
Closing connection.";
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No 
heartbeat. Closing connection.";)
                 shutdownSocket();
             }
         }
@@ -227,7 +224,6 @@ void DrillClientImpl::handleHeartbeatTimeout(const 
boost::system::error_code & e
     return;
 }
 
-
 void DrillClientImpl::Close() {
     shutdownSocket();
 }
@@ -257,8 +253,8 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
                     this,
                     boost::asio::placeholders::error
                     ));
-        DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer with "
-                << DrillClientConfig::getHandshakeTimeout() << " seconds." << 
std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer 
with "
+                << DrillClientConfig::getHandshakeTimeout() << " seconds." << 
std::endl;)
     }
 
     async_read(
@@ -271,7 +267,7 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
                 boost::asio::placeholders::error,
                 boost::asio::placeholders::bytes_transferred)
             );
-    DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::recvHandshake: async read 
waiting for server handshake response.\n";
+    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::recvHandshake: 
async read waiting for server handshake response.\n";)
     m_io_service.run();
     if(m_rbuf!=NULL){
         Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL;
@@ -292,7 +288,7 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
     boost::system::error_code error=err;
     // cancel the timer
     m_deadlineTimer.cancel();
-    DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << 
std::endl;)
     if(!error){
         InBoundRpcMessage msg;
         uint32_t length = 0;
@@ -306,14 +302,14 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
                         boost::asio::buffer(b, bytesToRead),
                         error);
                 if(err) break;
-                DRILL_LOG(LOG_TRACE) << "Handshake Message: actual bytes read 
= " << dataBytesRead << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Handshake Message: 
actual bytes read = " << dataBytesRead << std::endl;)
                 if(dataBytesRead==bytesToRead) break;
                 bytesToRead-=dataBytesRead;
                 b+=dataBytesRead;
             }
             DrillClientImpl::s_decoder.Decode(m_rbuf+bytes_read, length, msg);
         }else{
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: 
ERR_CONN_RDFAIL. No handshake.\n";
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n";)
             handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "No 
handshake"));
             return;
         }
@@ -344,7 +340,7 @@ void DrillClientImpl::handleHShakeReadTimeout(const 
boost::system::error_code &
         if (m_deadlineTimer.expires_at() <= 
boost::asio::deadline_timer::traits_type::now()){
             // The deadline has passed.
             m_deadlineTimer.expires_at(boost::posix_time::pos_infin);
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: 
Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::HandleHShakeReadTimeout: Deadline timer expired; 
ERR_CONN_HSHAKETIMOUT.\n";)
             handleConnError(CONN_HANDSHAKE_TIMEOUT, 
getMessage(ERR_CONN_HSHAKETIMOUT));
             m_io_service.stop();
             boost::system::error_code ignorederr;
@@ -356,7 +352,7 @@ void DrillClientImpl::handleHShakeReadTimeout(const 
boost::system::error_code &
 
 connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* 
properties){
 
-    DRILL_LOG(LOG_TRACE) << "validateHandShake\n";
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "validateHandShake\n";)
 
     exec::user::UserToBitHandshake u2b;
     u2b.set_channel(exec::shared::USER);
@@ -368,7 +364,7 @@ connectionStatus_t 
DrillClientImpl::validateHandshake(DrillUserProperties* prope
         std::string username;
         std::string err;
         if(!properties->validate(err)){
-            DRILL_LOG(LOG_INFO) << "Invalid user input:" << err << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Invalid user input:" << err 
<< std::endl;)
         }
         exec::user::UserProperties* userProperties = u2b.mutable_properties();
 
@@ -376,8 +372,8 @@ connectionStatus_t 
DrillClientImpl::validateHandshake(DrillUserProperties* prope
         for(size_t i=0; i<properties->size(); i++){
             std::map<std::string,uint32_t>::const_iterator 
it=DrillUserProperties::USER_PROPERTIES.find(properties->keyAt(i));
             if(it==DrillUserProperties::USER_PROPERTIES.end()){
-                DRILL_LOG(LOG_WARNING) << "Connection property ("<< 
properties->keyAt(i) 
-                    << ") is unknown and is being skipped" << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_WARNING) << "Connection property 
("<< properties->keyAt(i) 
+                    << ") is unknown and is being skipped" << std::endl;)
                 continue;
             }
             if(IS_BITSET((*it).second,USERPROP_FLAGS_SERVERPROP)){
@@ -392,9 +388,9 @@ connectionStatus_t 
DrillClientImpl::validateHandshake(DrillUserProperties* prope
                     //u2b.set_credentials(&creds);
                 }
                 if(IS_BITSET((*it).second,USERPROP_FLAGS_PASSWORD)){
-                    DRILL_LOG(LOG_INFO) <<  properties->keyAt(i) << ": 
********** " << std::endl;
+                    DRILL_MT_LOG(DRILL_LOG(LOG_INFO) <<  properties->keyAt(i) 
<< ": ********** " << std::endl;)
                 }else{
-                    DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ":" << 
properties->valueAt(i) << std::endl;
+                    DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << properties->keyAt(i) 
<< ":" << properties->valueAt(i) << std::endl;)
                 }
             }// Server properties
         }
@@ -406,7 +402,7 @@ connectionStatus_t 
DrillClientImpl::validateHandshake(DrillUserProperties* prope
 
         OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, 
coordId, &u2b);
         sendSync(out_msg);
-        DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination 
id: " << coordId << "\n";
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Sent handshake request message. 
Coordination id: " << coordId << "\n";)
     }
 
     connectionStatus_t ret = recvHandshake();
@@ -416,21 +412,21 @@ connectionStatus_t 
DrillClientImpl::validateHandshake(DrillUserProperties* prope
     if(this->m_handshakeStatus != exec::user::SUCCESS){
         switch(this->m_handshakeStatus){
             case exec::user::RPC_VERSION_MISMATCH:
-                DRILL_LOG(LOG_TRACE) << "Invalid rpc version.  Expected "
-                    << DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion 
<< "." << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Invalid rpc version.  
Expected "
+                    << DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion 
<< "." << std::endl;)
                 return handleConnError(CONN_BAD_RPC_VER,
                         getMessage(ERR_CONN_BAD_RPC_VER, DRILL_RPC_VERSION,
                             m_handshakeVersion,
                             this->m_handshakeErrorId.c_str(),
                             this->m_handshakeErrorMsg.c_str()));
             case exec::user::AUTH_FAILED:
-                DRILL_LOG(LOG_TRACE) << "Authentication failed." << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Authentication failed." 
<< std::endl;)
                 return handleConnError(CONN_AUTH_FAILED,
                         getMessage(ERR_CONN_AUTHFAIL,
                             this->m_handshakeErrorId.c_str(),
                             this->m_handshakeErrorMsg.c_str()));
             case exec::user::UNKNOWN_FAILURE:
-                DRILL_LOG(LOG_TRACE) << "Unknown error during handshake." << 
std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Unknown error during 
handshake." << std::endl;)
                 return handleConnError(CONN_HANDSHAKE_FAILED,
                         getMessage(ERR_CONN_UNKNOWN_ERR,
                             this->m_handshakeErrorId.c_str(),
@@ -451,14 +447,14 @@ void DrillClientImpl::startMessageListener() {
     if(this->m_pListenerThread==NULL){
         // Stopping the io_service from running out-of-work
         if(m_io_service.stopped()){
-            DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: 
io_service is stopped. Restarting." <<std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << 
"DrillClientImpl::startMessageListener: io_service is stopped. Restarting." 
<<std::endl;)
             m_io_service.reset();
         }
         this->m_pWork = new boost::asio::io_service::work(m_io_service);
         this->m_pListenerThread = new 
boost::thread(boost::bind(&boost::asio::io_service::run,
                     &this->m_io_service));
-        DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: 
Starting listener thread: "
-            << this->m_pListenerThread << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << 
"DrillClientImpl::startMessageListener: Starting listener thread: "
+            << this->m_pListenerThread << std::endl;)
     }
 }
 
@@ -480,22 +476,23 @@ DrillClientQueryResult* 
DrillClientImpl::SubmitQuery(::exec::shared::QueryType t
         OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::RUN_QUERY, 
coordId, &query);
         sendSync(out_msg);
 
-        pQuery = new DrillClientQueryResult(this, coordId);
+        pQuery = new DrillClientQueryResult(this, coordId, plan);
         pQuery->registerListener(l, lCtx);
         bool sendRequest=false;
         this->m_queryIds[coordId]=pQuery;
 
-        DRILL_LOG(LOG_DEBUG)  << "Sent query request. Coordination id = " << 
coordId << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent query request. " << "[" << 
m_connectedHost << "]"  << "Coordination id = " << coordId << std::endl;)
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent query " <<  "Coordination 
id = " << coordId << " query: " << plan << std::endl;)
 
         if(m_pendingRequests++==0){
             sendRequest=true;
         }else{
-            DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << 
std::endl;
-            DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << 
m_pendingRequests << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queueing query request to 
server" << std::endl;)
+            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = 
" << m_pendingRequests << std::endl;)
         }
         if(sendRequest){
-            DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending 
requests = "
-                << m_pendingRequests << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending query request. 
Number of pending requests = "
+                << m_pendingRequests << std::endl;)
             getNextResult(); // async wait for results
         }
     }
@@ -513,7 +510,7 @@ void DrillClientImpl::getNextResult(){
 
     {
         boost::unique_lock<boost::mutex> 
memLock(AllocatedBuffer::s_memCVMutex);
-        DRILL_LOG(LOG_TRACE) << "Read blocked waiting for memory." << 
std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Read blocked waiting for 
memory." << std::endl;)
         while(AllocatedBuffer::s_isBufferLimitReached){
             AllocatedBuffer::s_memCV.wait(memLock);
         }
@@ -522,8 +519,8 @@ void DrillClientImpl::getNextResult(){
     //use free, not delete to free
     ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN);
     if (DrillClientConfig::getQueryTimeout() > 0){
-        DRILL_LOG(LOG_TRACE) << "Started new query wait timer with "
-                << DrillClientConfig::getQueryTimeout() << " seconds." << 
std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new query wait timer 
with "
+                << DrillClientConfig::getQueryTimeout() << " seconds." << 
std::endl;)
         
m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getQueryTimeout()));
         m_deadlineTimer.async_wait(boost::bind(
             &DrillClientImpl::handleReadTimeout,
@@ -544,7 +541,7 @@ void DrillClientImpl::getNextResult(){
                 boost::asio::placeholders::error,
                 boost::asio::placeholders::bytes_transferred)
             );
-    DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::getNextResult: async_read from 
the server\n";
+    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::getNextResult: 
async_read from the server\n";)
 }
 
 void DrillClientImpl::waitForResults(){
@@ -565,8 +562,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
         InBoundRpcMessage& msg,
         boost::system::error_code& error){
 
-    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from 
buffer "
-        <<  reinterpret_cast<int*>(_buf) << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read 
message from buffer "
+        <<  reinterpret_cast<int*>(_buf) << std::endl;)
     size_t leftover=0;
     uint32_t rmsgLen;
     AllocatedBufferPtr currentBuffer;
@@ -576,15 +573,15 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
         // but we don't have to keep the lock while we decode the rest of the 
buffer.
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
         int bytes_read = DrillClientImpl::s_decoder.LengthDecode(_buf, 
&rmsgLen);
-        DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl;
-        DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << 
std::endl;)
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << 
std::endl;)
 
         if(rmsgLen>0){
             leftover = LEN_PREFIX_BUFLEN - bytes_read;
             // Allocate a buffer
             currentBuffer=new AllocatedBuffer(rmsgLen);
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Allocated and 
locked buffer: [ "
-                << currentBuffer << ", size = " << rmsgLen << " ]\n";
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: 
Allocated and locked buffer: [ "
+                << currentBuffer << ", size = " << rmsgLen << " ]\n";)
             if(currentBuffer==NULL){
                 Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
                 return handleQryError(QRY_CLIENT_OUTOFMEM, 
getMessage(ERR_QRY_OUTOFMEM), NULL);
@@ -593,8 +590,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
             if(leftover){
                 memcpy(currentBuffer->m_pBuffer, _buf + bytes_read, leftover);
             }
-            DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : "
-                << (rmsgLen - leftover) << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - 
leftover) : "
+                << (rmsgLen - leftover) << std::endl;)
             ByteBuf_t b=currentBuffer->m_pBuffer + leftover;
             size_t bytesToRead=rmsgLen - leftover;
               
@@ -603,7 +600,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
                         boost::asio::buffer(b, bytesToRead),
                         error);
                 if(error) break;
-                DRILL_LOG(LOG_TRACE) << "Data Message: actual bytes read = " 
<< dataBytesRead << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Data Message: actual 
bytes read = " << dataBytesRead << std::endl;)
                 if(dataBytesRead==bytesToRead) break;
                 bytesToRead-=dataBytesRead;
                 b+=dataBytesRead;
@@ -612,7 +609,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
             if(!error){
                 // read data successfully
                 DrillClientImpl::s_decoder.Decode(currentBuffer->m_pBuffer, 
rmsgLen, msg);
-                DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: 
" <<msg.m_coord_id<< std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Done decoding chunk. 
Coordination id: " <<msg.m_coord_id<< std::endl;)
             }else{
                 Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
                 return handleQryError(QRY_COMM_ERROR,
@@ -624,8 +621,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
             return handleQryError(QRY_INTERNAL_ERROR, 
getMessage(ERR_QRY_INVREADLEN), NULL);
         }
     }
-    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free buffer "
-        <<  reinterpret_cast<int*>(_buf) << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free 
buffer "
+        <<  reinterpret_cast<int*>(_buf) << std::endl;)
     Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
     return QRY_SUCCESS;
 }
@@ -639,9 +636,9 @@ status_t 
DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
         exec::shared::QueryResult qr;
 
-        DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << 
std::endl;)
         qr.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
-        DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl;)
         
         qid.CopyFrom(qr.query_id());
         
@@ -657,7 +654,7 @@ status_t 
DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
                 std::string valErr;
                 if( (ret=validateResultMessage(msg, qr, valErr)) != 
QRY_SUCCESS){
                     delete allocatedBuffer;
-                    DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::processQueryResult: ERR_QRY_INVRPC." << std::endl;
+                    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::processQueryResult: ERR_QRY_INVRPC." << std::endl;)
                     return handleQryError(ret, getMessage(ERR_QRY_INVRPC, 
valErr.c_str()), pDrillClientQueryResult);
                 }
                 ret=processQueryStatusResult(&qr, pDrillClientQueryResult);
@@ -665,9 +662,9 @@ status_t 
DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
                 // We've received the final message for a query that has been 
cancelled
                 // or for which the resources have been freed. We no longer 
need to listen
                 // for more incoming messages for such a query.
-                DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult:" 
<< debugPrintQid(qid)<< " completed."<< std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::processQueryResult:" << debugPrintQid(qid)<< " completed."<< 
std::endl;)
                 m_pendingRequests--;
-                DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: 
pending requests is " << m_pendingRequests<< std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::processQueryResult: pending requests is " << 
m_pendingRequests<< std::endl;)
                 ret=QRY_CANCELED;
             }
             delete allocatedBuffer;
@@ -676,10 +673,10 @@ status_t 
DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
             // Normal query results come back with query_state not set.
             // Actually this is not strictly true. The query state is set to
             // 0(i.e. PENDING), but protobuf thinks this means the value is 
not set.
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: 
Query State was not set.\n";
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::processQueryResult: Query State was not set.\n";)
         }
     }
-    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: " << 
m_pendingRequests << " requests pending." << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: 
" << m_pendingRequests << " requests pending." << std::endl;)
     if(m_pendingRequests==0){
         // signal any waiting client that it can exit because there are no 
more any query results to arrive.
         // We keep the heartbeat going though.
@@ -701,21 +698,21 @@ status_t 
DrillClientImpl::processQueryData(AllocatedBufferPtr  allocatedBuffer,
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
         exec::shared::QueryData* qr = new exec::shared::QueryData; //Record 
Batch will own this object and free it up.
 
-        DRILL_LOG(LOG_DEBUG) << "Processing Query Data " << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Data " << 
std::endl;)
         qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
-        DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;)
 
         qid.CopyFrom(qr->query_id());
         if(qid.part1()==0){
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. 
Ignore and return QRY_SUCCESS." << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << 
std::endl;)
             delete allocatedBuffer;
             return QRY_SUCCESS;
         }
 
         pDrillClientQueryResult=findQueryResult(qid);
         if(pDrillClientQueryResult==NULL){
-            DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for 
canceled query (" 
-                                 << debugPrintQid(qid) << ")." << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Cleaning up resources 
allocated for canceled query (" 
+                                 << debugPrintQid(qid) << ")." << std::endl;)
             delete qr;
             delete allocatedBuffer;
             return ret;
@@ -726,23 +723,23 @@ status_t 
DrillClientImpl::processQueryData(AllocatedBufferPtr  allocatedBuffer,
         if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){
             delete allocatedBuffer;
             delete qr;
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: 
ERR_QRY_INVRPC.\n";
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";)
             pDrillClientQueryResult->setQueryStatus(ret);
             return handleQryError(ret, getMessage(ERR_QRY_INVRPC, 
valErr.c_str()), pDrillClientQueryResult);
         }
 
         //Build Record Batch here
-        DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << 
debugPrintQid(qr->query_id()) << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for Query 
Id - " << debugPrintQid(qr->query_id()) << std::endl;)
 
         pRecordBatch= new RecordBatch(qr, allocatedBuffer,  msg.m_dbody);
         pDrillClientQueryResult->m_numBatches++;
 
-        DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << 
(void*)pRecordBatch << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << 
(void*)pRecordBatch << std::endl;)
         pRecordBatch->build();
-        DRILL_LOG(LOG_DEBUG) << 
debugPrintQid(qr->query_id())<<"recordBatch.numRecords "
-            << pRecordBatch->getNumRecords()  << std::endl;
-        DRILL_LOG(LOG_DEBUG) << 
debugPrintQid(qr->query_id())<<"recordBatch.numFields "
-            << pRecordBatch->getNumFields()  << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << 
debugPrintQid(qr->query_id())<<"recordBatch.numRecords "
+            << pRecordBatch->getNumRecords()  << std::endl;)
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << 
debugPrintQid(qr->query_id())<<"recordBatch.numFields "
+            << pRecordBatch->getNumFields()  << std::endl;)
 
         ret=pDrillClientQueryResult->setupColumnDefs(qr);
         if(ret==QRY_SUCCESS_WITH_INFO){
@@ -752,8 +749,8 @@ status_t 
DrillClientImpl::processQueryData(AllocatedBufferPtr  allocatedBuffer,
         pDrillClientQueryResult->setIsQueryPending(true);
         pfnQueryResultsListener 
pResultsListener=pDrillClientQueryResult->m_pResultsListener;
         if(pDrillClientQueryResult->m_bIsLastChunk){
-            DRILL_LOG(LOG_DEBUG) << 
debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
-                <<  "Received last batch. " << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << 
debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
+                <<  "Received last batch. " << std::endl;)
             ret=QRY_NO_MORE_DATA;
         }
         pDrillClientQueryResult->setQueryStatus(ret);
@@ -770,7 +767,7 @@ status_t 
DrillClientImpl::processQueryData(AllocatedBufferPtr  allocatedBuffer,
         // Do not decrement pending requests here. We have sent a cancel and 
we may still receive results that are
         // pushed on the wire before the cancel is processed.
         pDrillClientQueryResult->setIsQueryPending(false);
-        DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << 
std::endl;)
         pDrillClientQueryResult->setQueryStatus(ret);
         clearMapEntries(pDrillClientQueryResult);
         return ret;
@@ -780,27 +777,27 @@ status_t 
DrillClientImpl::processQueryData(AllocatedBufferPtr  allocatedBuffer,
 
 status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, 
InBoundRpcMessage& msg ){
     DrillClientQueryResult* pDrillClientQueryResult=NULL;
-    DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << 
msg.m_coord_id << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with 
coordination id:" << msg.m_coord_id << std::endl;)
     status_t ret=QRY_SUCCESS;
 
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
     std::map<int,DrillClientQueryResult*>::iterator it;
     for(it=this->m_queryIds.begin();it!=this->m_queryIds.end();it++){
         std::string qidString = 
it->second->m_pQueryId!=NULL?debugPrintQid(*it->second->m_pQueryId):std::string("NULL");
-        DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: 
coordinationId: " << it->first
-        << " QueryId: "<< qidString << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: 
m_queryIds: coordinationId: " << it->first
+        << " QueryId: "<< qidString << std::endl;)
     }
     if(msg.m_coord_id==0){
-        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: 
m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: 
m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
         return QRY_SUCCESS;
     }
     it=this->m_queryIds.find(msg.m_coord_id);
     if(it!=this->m_queryIds.end()){
         pDrillClientQueryResult=(*it).second;
         exec::shared::QueryId *qid = new exec::shared::QueryId;
-        DRILL_LOG(LOG_TRACE)  << "Received Query Handle " << 
msg.m_pbody.size() << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received Query Handle " << 
msg.m_pbody.size() << std::endl;)
         qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
-        DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << 
std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << 
debugPrintQid(*qid) << std::endl;)
         m_queryResults[qid]=pDrillClientQueryResult;
         //save queryId allocated here so we can free it later
         pDrillClientQueryResult->setQueryId(qid);
@@ -814,20 +811,20 @@ status_t 
DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB
 
 DrillClientQueryResult* 
DrillClientImpl::findQueryResult(exec::shared::QueryId& qid){
     DrillClientQueryResult* pDrillClientQueryResult=NULL;
-    DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) 
<< std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << 
debugPrintQid(qid) << std::endl;)
     std::map<exec::shared::QueryId*, DrillClientQueryResult*, 
compareQueryId>::iterator it;
-    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: 
m_queryResults size: " << m_queryResults.size() << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: 
m_queryResults size: " << m_queryResults.size() << std::endl;)
     if(m_queryResults.size() != 0){
         for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++){
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::findQueryResult: 
m_QueryResult ids: [" << it->first->part1() << ":"
-                << it->first->part2() << "]\n";
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::findQueryResult: m_QueryResult ids: [" << it->first->part1() 
<< ":"
+                << it->first->part2() << "]\n";)
         }
     }
     it=this->m_queryResults.find(&qid);
     if(it!=this->m_queryResults.end()){
         pDrillClientQueryResult=(*it).second;
-        DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " <<
-            debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query 
Id - " <<
+            debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl;)
     }
     return pDrillClientQueryResult;
 }
@@ -870,7 +867,7 @@ status_t 
DrillClientImpl::processQueryStatusResult(exec::shared::QueryResult* qr
             break;
         default:
             {
-                DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::processQueryStatusResult: Unknown Query State.\n";
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::processQueryStatusResult: Unknown Query State.\n";)
                 ret=handleQryError(QRY_INTERNAL_ERROR,
                         getMessage(ERR_QRY_UNKQRYSTATE),
                         pDrillClientQueryResult);
@@ -887,7 +884,7 @@ void DrillClientImpl::handleReadTimeout(const 
boost::system::error_code & err){
         // Check whether the deadline has passed.
         if (m_deadlineTimer.expires_at() <= 
boost::asio::deadline_timer::traits_type::now()){
             // The deadline has passed.
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleReadTimeout: 
Deadline timer expired; ERR_QRY_TIMOUT. \n";
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::handleReadTimeout: Deadline timer expired; ERR_QRY_TIMOUT. 
\n";)
             handleQryError(QRY_TIMEOUT, getMessage(ERR_QRY_TIMOUT), NULL);
             // There is no longer an active deadline. The expiry is set to 
positive
             // infinity so that the timer never expires until a new deadline 
is set.
@@ -913,18 +910,18 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
         const boost::system::error_code& err,
         size_t bytes_transferred) {
     boost::system::error_code error=err;
-    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from 
buffer "
-        <<  reinterpret_cast<int*>(_buf) << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle 
Read from buffer "
+        <<  reinterpret_cast<int*>(_buf) << std::endl;)
     if(DrillClientConfig::getQueryTimeout() > 0){
         // Cancel the timeout if handleRead is called
-        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline 
timer.\n";
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: 
Cancel deadline timer.\n";)
         m_deadlineTimer.cancel();
     }
     if(!error){
         InBoundRpcMessage msg;
         boost::lock_guard<boost::mutex> lock(this->m_prMutex);
 
-        DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << 
std::endl;)
         AllocatedBufferPtr allocatedBuffer=NULL;
 
         if(readMsg(_buf, &allocatedBuffer, msg, error)!=QRY_SUCCESS){
@@ -938,14 +935,14 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
         if(!error && msg.m_mode==exec::rpc::PONG){ //heartbeat response. Throw 
it away
             m_pendingRequests--;
             delete allocatedBuffer;
-            DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " <<  
std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from 
server. " <<  std::endl;)
             if(m_pendingRequests!=0){
                 boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
                 getNextResult();
             }else{
-                               boost::unique_lock<boost::mutex> 
cvLock(this->m_dcMutex);
-                DRILL_LOG(LOG_TRACE) << "No more results expected from server. 
" <<  std::endl;
-                               m_cv.notify_one();
+                boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected 
from server. " <<  std::endl;)
+                m_cv.notify_one();
             }
             return;
         }else if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){
@@ -988,7 +985,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
                 // We have a socket read error, but we do not know which query 
this is for.
                 // Signal ALL pending queries that they should stop waiting.
                 delete allocatedBuffer;
-                DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "read error: " << error 
<< std::endl;)
                 handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, 
error.message().c_str()), NULL);
                 return;
             }else{
@@ -997,20 +994,20 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
                 // We should properly handle these handshake requests/responses
                 if(msg.has_rpc_type() && 
msg.m_rpc_type==exec::user::HANDSHAKE){
                     if(msg.has_mode() && msg.m_mode==exec::rpc::REQUEST){
-                        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: 
Handshake request from server. Send response.\n";
+                        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::handleRead: Handshake request from server. Send response.\n";)
                         exec::user::UserToBitHandshake u2b;
                         u2b.set_channel(exec::shared::USER);
                         u2b.set_rpc_version(DRILL_RPC_VERSION);
                         u2b.set_support_listening(true);
                         OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, 
exec::user::HANDSHAKE, msg.m_coord_id, &u2b);
                         sendSync(out_msg);
-                        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: 
Handshake response sent.\n";
+                        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::handleRead: Handshake response sent.\n";)
                     }else{
-                        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: 
Handshake response from server. Ignore.\n";
+                        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";)
                     }
                 }else{
-                    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: 
ERR_QRY_INVRPCTYPE. "
-                        << "QueryResult returned " << msg.m_rpc_type << 
std::endl;
+                    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
+                        << "QueryResult returned " << msg.m_rpc_type << 
std::endl;)
                     handleQryError(QRY_INTERNAL_ERROR, 
getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
                 }
                 delete allocatedBuffer;
@@ -1025,8 +1022,8 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
         // boost error
         Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
-        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. 
"
-            "Boost Communication Error: " << error.message() << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: 
ERR_QRY_COMMERR. "
+            "Boost Communication Error: " << error.message() << std::endl;)
         handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, 
error.message().c_str()), NULL);
         return;
     }
@@ -1066,6 +1063,7 @@ connectionStatus_t 
DrillClientImpl::handleConnError(connectionStatus_t status, s
     }else{
         if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
         m_pError=pErr;
+        shutdownSocket();
     }
     return status;
 }
@@ -1158,7 +1156,7 @@ void DrillClientImpl::sendAck(InBoundRpcMessage& msg, 
bool isOk){
     OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, 
msg.m_coord_id, &ack);
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
     sendSync(ack_msg);
-    DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;)
 }
 
 void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){
@@ -1166,7 +1164,7 @@ void DrillClientImpl::sendCancel(exec::shared::QueryId* 
pQueryId){
     uint64_t coordId = this->getNextCoordinationId();
     OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, 
exec::user::CANCEL_QUERY, coordId, pQueryId);
     sendSync(cancel_msg);
-    DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;)
 }
 
 void DrillClientImpl::shutdownSocket(){
@@ -1174,7 +1172,7 @@ void DrillClientImpl::shutdownSocket(){
     boost::system::error_code ignorederr;
     m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
     m_bIsConnected=false;
-    DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;)
 }
 
 // This COPIES the FieldMetadata definition for the record batch.  ColumnDefs 
held by this
@@ -1236,7 +1234,7 @@ status_t 
DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
         RecordBatch* b,
         DrillClientError* err) {
     //ctx; // unused, we already have the this pointer
-    DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query result listener called" << 
std::endl;)
     //check if the query has been canceled. IF so then return FAILURE. Caller 
will send cancel to the server.
     if(this->m_bCancel){
         if(b!=NULL) delete b;
@@ -1247,8 +1245,8 @@ status_t 
DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
         {
             if(b!=NULL){
 #ifdef DEBUG
-                
DRILL_LOG(LOG_DEBUG)<<debugPrintQid(b->getQueryResult()->query_id())
-                    << "Query result listener saved result to queue." << 
std::endl;
+                
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)<<debugPrintQid(b->getQueryResult()->query_id())
+                    << "Query result listener saved result to queue." << 
std::endl;)
 #endif
                 boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
                 this->m_recordBatches.push(b);
@@ -1267,7 +1265,7 @@ RecordBatch*  DrillClientQueryResult::peekNext(){
     boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
     //if no more data, return NULL;
     if(!m_bIsQueryPending) return NULL;
-    DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." 
<< std::endl;)
     while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending) {
         this->m_cv.wait(cvLock);
     }
@@ -1281,14 +1279,14 @@ RecordBatch*  DrillClientQueryResult::getNext() {
     boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
     //if no more data, return NULL;
     if(!m_bIsQueryPending){
-        DRILL_LOG(LOG_TRACE) << "Query is done." << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query is done." << std::endl;)
         if(!m_recordBatches.empty()){
-            DRILL_LOG(LOG_TRACE) << " But there is a Record batch left 
behind." << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << " But there is a Record batch 
left behind." << std::endl;)
         }
         return NULL;
     }
 
-    DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." 
<< std::endl;)
     while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending){
         this->m_cv.wait(cvLock);
     }
@@ -1367,7 +1365,7 @@ void DrillClientQueryResult::clearAndDestroy(){
         m_columnDefs->clear();
     }
     if(this->m_pQueryId!=NULL){
-        DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << 
debugPrintQid(*this->m_pQueryId) << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " 
<< debugPrintQid(*this->m_pQueryId) << std::endl;)
     }
     //Tell the parent to remove this from its lists
     m_pClient->clearMapEntries(this);
@@ -1379,7 +1377,7 @@ void DrillClientQueryResult::clearAndDestroy(){
     if(!m_recordBatches.empty()){
         // When multiple qwueries execute in parallel we sometimes get an 
empty record batch back from the server _after_
         // the last chunk has been received. We eventually delete it.
-        DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< 
std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left 
behind "<< std::endl;)
         RecordBatch* pR=NULL;
         while(!m_recordBatches.empty()){
             pR=m_recordBatches.front();
@@ -1392,6 +1390,210 @@ void DrillClientQueryResult::clearAndDestroy(){
     }
 }
 
+
+connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
+    connectionStatus_t stat = CONN_SUCCESS;
+    std::string pathToDrill, protocol, hostPortStr;
+    std::string host;
+    std::string port;
+    m_connectStr=connStr;
+    Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
+    if(!strcmp(protocol.c_str(), "zk")){
+        // Get a list of drillbits
+        ZookeeperImpl zook;
+        std::vector<std::string> drillbits;
+        int err = zook.getAllDrillbits(hostPortStr.c_str(), 
pathToDrill.c_str(), drillbits);
+        if(!err){
+            Utils::shuffle(drillbits);
+            // The original shuffled order is maintained if we shuffle first 
and then add any missing elements
+            Utils::add(m_drillbits, drillbits);
+            exec::DrillbitEndpoint e;
+            size_t nextIndex=0;
+            {
+                boost::lock_guard<boost::mutex> cLock(m_cMutex);
+                m_lastConnection++;
+                nextIndex = (m_lastConnection)%(getDrillbitCount());
+            }
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection"
+                    << "(" << (void*)this << ")"
+                    << ": Current counter is: " 
+                    << m_lastConnection << std::endl;)
+                err=zook.getEndPoint(m_drillbits, nextIndex, e);
+            if(!err){
+                host=boost::lexical_cast<std::string>(e.address());
+                port=boost::lexical_cast<std::string>(e.user_port());
+            }
+        }
+        if(err){
+            return handleConnError(CONN_ZOOKEEPER_ERROR, 
getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
+        }
+        zook.close();
+        m_bIsDirectConnection=false;
+    }else if(!strcmp(protocol.c_str(), "local")){
+        char tempStr[MAX_CONNECT_STR+1];
+        strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); 
tempStr[MAX_CONNECT_STR]=0;
+        host=strtok(tempStr, ":");
+        port=strtok(NULL, "");
+        m_bIsDirectConnection=true;
+    }else{
+        return handleConnError(CONN_INVALID_INPUT, 
getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
+    }
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled) " 
<< host << ":" << port << std::endl;)
+        DrillClientImpl* pDrillClientImpl = new DrillClientImpl();
+    stat =  pDrillClientImpl->connect(host.c_str(), port.c_str());
+    if(stat == CONN_SUCCESS){
+        boost::lock_guard<boost::mutex> lock(m_poolMutex);
+        m_clientConnections.push_back(pDrillClientImpl);
+    }else{
+        DrillClientError* pErr = pDrillClientImpl->getError();
+        handleConnError((connectionStatus_t)pErr->status, pErr->msg);
+        delete pDrillClientImpl;
+    }
+    return stat;
+}
+
+connectionStatus_t 
PooledDrillClientImpl::validateHandshake(DrillUserProperties* props){
+    // Assume there is one valid connection to at least one drillbit
+    connectionStatus_t stat=CONN_FAILURE;
+    // Keep a copy of the user properties
+    if(props!=NULL){
+        m_pUserProperties = new DrillUserProperties;
+        for(size_t i=0; i<props->size(); i++){
+            m_pUserProperties->setProperty(
+                    props->keyAt(i),
+                    props->valueAt(i)
+                    );
+        }
+    }
+    DrillClientImpl* pDrillClientImpl = getOneConnection();
+    if(pDrillClientImpl != NULL){
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: (Pooled) " 
<< pDrillClientImpl->m_connectedHost << std::endl;)
+        stat=pDrillClientImpl->validateHandshake(m_pUserProperties);
+    }
+    else{
+        stat =  handleConnError(CONN_NOTCONNECTED, 
getMessage(ERR_CONN_NOCONN));
+    }
+    return stat;
+}
+
+DrillClientQueryResult* 
PooledDrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const 
std::string& plan, pfnQueryResultsListener listener, void* listenerCtx){
+    DrillClientQueryResult* pDrillClientQueryResult = NULL;
+    DrillClientImpl* pDrillClientImpl = NULL;
+    pDrillClientImpl = getOneConnection();
+    if(pDrillClientImpl != NULL){
+        
pDrillClientQueryResult=pDrillClientImpl->SubmitQuery(t,plan,listener,listenerCtx);
+        m_queriesExecuted++;
+    }
+    return pDrillClientQueryResult;
+}
+
+void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult* 
pQryResult){
+    // Nothing to do. If this class ever keeps track of executing queries then 
it will need 
+    // to implement this call to free any query specific resources the pool 
might have 
+    // allocated
+    return;
+}
+
+bool PooledDrillClientImpl::Active(){
+    boost::lock_guard<boost::mutex> lock(m_poolMutex);
+    for(std::vector<DrillClientImpl*>::iterator it = 
m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+        if((*it)->Active()){
+            return true;
+        }
+    }
+    return false;
+}
+
+void PooledDrillClientImpl::Close() {
+    boost::lock_guard<boost::mutex> lock(m_poolMutex);
+    for(std::vector<DrillClientImpl*>::iterator it = 
m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+        (*it)->Close();
+        delete *it;
+    }
+    m_clientConnections.clear();
+    if(m_pUserProperties!=NULL){ delete m_pUserProperties; 
m_pUserProperties=NULL;}
+    if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
+    m_lastConnection=-1;
+    m_queriesExecuted=0;
+}
+
+DrillClientError* PooledDrillClientImpl::getError(){
+    std::string errMsg;
+    std::string nl="";
+    uint32_t stat;
+    boost::lock_guard<boost::mutex> lock(m_poolMutex);
+    for(std::vector<DrillClientImpl*>::iterator it = 
m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+        if((*it)->getError() != NULL){
+            errMsg+=nl+"Query"/*+(*it)->queryId() 
+*/":"+(*it)->getError()->msg;
+            stat=(*it)->getError()->status;
+        }
+    }
+    if(errMsg.length()>0){
+        if(m_pError!=NULL){ delete m_pError; m_pError=NULL; }
+        m_pError = new DrillClientError(stat, 
DrillClientError::QRY_ERROR_START+stat, errMsg);
+    }
+    return m_pError;
+}
+
+//Waits as long as any one drillbit connection has results pending
+void PooledDrillClientImpl::waitForResults(){
+    boost::lock_guard<boost::mutex> lock(m_poolMutex);
+    for(std::vector<DrillClientImpl*>::iterator it = 
m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+        (*it)->waitForResults();
+    }
+    return;
+}
+
+connectionStatus_t PooledDrillClientImpl::handleConnError(connectionStatus_t 
status, std::string msg){
+    DrillClientError* pErr = new DrillClientError(status, 
DrillClientError::CONN_ERROR_START+status, msg);
+    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Connection Error: (Pooled) " << 
pErr->msg << std::endl;)
+    if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
+    m_pError=pErr;
+    return status;
+}
+
+DrillClientImpl* PooledDrillClientImpl::getOneConnection(){
+    DrillClientImpl* pDrillClientImpl = NULL;
+    while(pDrillClientImpl==NULL){
+        if(m_queriesExecuted == 0){
+            // First query ever sent can use the connection already 
established to authenticate the user
+            boost::lock_guard<boost::mutex> lock(m_poolMutex);
+            pDrillClientImpl=m_clientConnections[0];// There should be one 
connection in the list when the first query is executed
+        }else if(m_clientConnections.size() == m_maxConcurrentConnections){
+            // Pool is full. Use one of the already established connections
+            boost::lock_guard<boost::mutex> lock(m_poolMutex);
+            pDrillClientImpl = 
m_clientConnections[m_queriesExecuted%m_maxConcurrentConnections];
+            if(!pDrillClientImpl->Active()){
+                Utils::eraseRemove(m_clientConnections, pDrillClientImpl);
+                pDrillClientImpl=NULL;
+            }
+        }else{
+            int tries=0;
+            connectionStatus_t ret=CONN_SUCCESS;
+            while(pDrillClientImpl==NULL && tries++ < 3){
+                if((ret=connect(m_connectStr.c_str()))==CONN_SUCCESS){
+                    boost::lock_guard<boost::mutex> lock(m_poolMutex);
+                    pDrillClientImpl=m_clientConnections.back();
+                    ret=pDrillClientImpl->validateHandshake(m_pUserProperties);
+                    if(ret!=CONN_SUCCESS){
+                        delete pDrillClientImpl; pDrillClientImpl=NULL;
+                        m_clientConnections.erase(m_clientConnections.end());
+                    }
+                }
+            } // try a few times
+            if(ret!=CONN_SUCCESS){
+                break;
+            }
+        } // need a new connection 
+    }// while
+
+    if(pDrillClientImpl==NULL){
+        connectionStatus_t status = CONN_NOTCONNECTED;
+        handleConnError(status, getMessage(status));
+    }
+    return pDrillClientImpl;
+}
+
 char ZookeeperImpl::s_drillRoot[]="/drill/";
 char ZookeeperImpl::s_defaultCluster[]="drillbits1";
 
@@ -1427,6 +1629,96 @@ ZooLogLevel ZookeeperImpl::getZkLogLevel(){
     return ZOO_LOG_LEVEL_ERROR;
 }
 
+int ZookeeperImpl::getAllDrillbits(const char* connectStr, const char* 
pathToDrill, std::vector<std::string>& drillbits){
+    uint32_t waitTime=30000; // 10 seconds
+    zoo_set_debug_level(getZkLogLevel());
+    zoo_deterministic_conn_order(1); // enable deterministic order
+    struct String_vector* pDrillbits=NULL;
+    m_zh = zookeeper_init(connectStr, watcher, waitTime, 0, this, 0);
+    if(!m_zh) {
+        m_err = getMessage(ERR_CONN_ZKFAIL);
+        zookeeper_close(m_zh);
+        return -1;
+    }else{
+        m_err="";
+        //Wait for the completion handler to signal successful connection
+        boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex);
+        boost::system_time const timeout=boost::get_system_time()+ 
boost::posix_time::milliseconds(waitTime);
+        while(this->m_bConnecting) {
+            if(!this->m_cv.timed_wait(bufferLock, timeout)){
+                m_err = getMessage(ERR_CONN_ZKTIMOUT);
+                zookeeper_close(m_zh);
+                return -1;
+            }
+        }
+    }
+    if(m_state!=ZOO_CONNECTED_STATE){
+        zookeeper_close(m_zh);
+        return -1;
+    }
+    int rc = ZOK;
+    if(pathToDrill==NULL || strlen(pathToDrill)==0){
+        m_rootDir=s_drillRoot;
+        m_rootDir += s_defaultCluster;
+    }else{
+        m_rootDir=pathToDrill;
+    }
+
+    pDrillbits = new String_vector;
+    rc=zoo_get_children(m_zh, m_rootDir.c_str(), 0, pDrillbits);
+    if(rc!=ZOK){
+        delete pDrillbits;
+        m_err=getMessage(ERR_CONN_ZKERR, rc);
+        zookeeper_close(m_zh);
+        return -1;
+    }
+    if(pDrillbits && pDrillbits->count > 0){
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Found " << pDrillbits->count << 
" drillbits in cluster (" 
+                << connectStr << "/" << pathToDrill
+                << ")." <<std::endl;)
+            for(int i=0; i<pDrillbits->count; i++){
+                drillbits.push_back(pDrillbits->data[i]);
+            }
+        for(int i=0; i<drillbits.size(); i++){
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "\t Unshuffled Drillbit id: " 
<< drillbits[i] << std::endl;)
+        }
+    }
+    delete pDrillbits;
+    return 0;
+}
+
+int ZookeeperImpl::getEndPoint(std::vector<std::string>& drillbits, size_t 
index, exec::DrillbitEndpoint& endpoint){
+    int rc = ZOK;
+    exec::DrillServiceInstance drillServiceInstance;
+    if( drillbits.size() >0){
+        // pick the drillbit at 'index'
+        const char * bit=drillbits[index].c_str();
+        std::string s;
+        s=m_rootDir +  std::string("/") + bit;
+        int buffer_len=MAX_CONNECT_STR;
+        char buffer[MAX_CONNECT_STR+1];
+        struct Stat stat;
+        buffer[MAX_CONNECT_STR]=0;
+        rc= zoo_get(m_zh, s.c_str(), 0, buffer,  &buffer_len, &stat);
+        if(rc!=ZOK){
+            m_err=getMessage(ERR_CONN_ZKDBITERR, rc);
+            zookeeper_close(m_zh);
+            return -1;
+        }
+        exec::DrillServiceInstance drillServiceInstance;
+        drillServiceInstance.ParseFromArray(buffer, buffer_len);
+        endpoint=drillServiceInstance.endpoint();
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" <<index << 
">. Selected " << drillServiceInstance.DebugString() << std::endl;)
+    }else{
+
+        m_err=getMessage(ERR_CONN_ZKNODBIT);
+        zookeeper_close(m_zh);
+        return -1;
+    }
+    return 0;
+}
+
+// Deprecated
 int ZookeeperImpl::connectToZookeeper(const char* connectStr, const char* 
pathToDrill){
     uint32_t waitTime=30000; // 10 seconds
     zoo_set_debug_level(getZkLogLevel());
@@ -1525,7 +1817,7 @@ void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int 
state, const char *pat
     // signal the cond var
     {
         if (state == ZOO_CONNECTED_STATE){
-            DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << 
std::endl;)
         }
         boost::lock_guard<boost::mutex> bufferLock(self->m_cvMutex);
         self->m_bConnecting=false;
@@ -1535,7 +1827,7 @@ void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int 
state, const char *pat
 
 void ZookeeperImpl:: debugPrint(){
     if(m_zh!=NULL && m_state==ZOO_CONNECTED_STATE){
-        DRILL_LOG(LOG_TRACE) << m_drillServiceInstance.DebugString() << 
std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
m_drillServiceInstance.DebugString() << std::endl;)
     }
 }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/df0f0af3/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp 
b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index f19a015..06f37e0 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -34,13 +34,18 @@
 #include <queue>
 #include <vector>
 #include <boost/asio.hpp>
-#include <boost/asio/deadline_timer.hpp>
-#include <boost/thread.hpp>
-#ifdef _WIN32
+
+#if defined _WIN32  || defined _WIN64
 #include <zookeeper.h>
+//Windows header files redefine 'random'
+#ifdef random
+#undef random
+#endif
 #else
 #include <zookeeper/zookeeper.h>
 #endif
+#include <boost/asio/deadline_timer.hpp>
+#include <boost/thread.hpp>
 
 #include "drill/drillClient.hpp"
 #include "rpcEncoder.hpp"
@@ -58,12 +63,50 @@ class RecordBatch;
 class RpcEncoder;
 class RpcDecoder;
 
+/*
+ * Defines the interface used by DrillClient and implemented by 
DrillClientImpl and PooledDrillClientImpl
+ * */
+class DrillClientImplBase{
+    public:
+        DrillClientImplBase(){
+        }
+
+        virtual ~DrillClientImplBase(){
+        }
+
+        //Connect via Zookeeper or directly.
+        //Makes an initial connection to a drillbit. successful connect adds 
the first drillbit to the pool.
+        virtual connectionStatus_t connect(const char* connStr)=0;
+
+        // Test whether the client is active. Returns true if any one of the 
underlying connections is active
+        virtual bool Active()=0;
+
+        // Closes all open connections. 
+        virtual void Close()=0;
+
+        // Returns the last error encountered by any of the underlying 
executing queries or connections
+        virtual DrillClientError* getError()=0;
+
+        // Submits a query to a drillbit. 
+        virtual DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType 
t, const std::string& plan, pfnQueryResultsListener listener, void* 
listenerCtx)=0;
+
+        //Waits as a connection has results pending
+        virtual void waitForResults()=0;
+
+        //Validates handshake at connect time.
+        virtual connectionStatus_t validateHandshake(DrillUserProperties* 
props)=0;
+
+        virtual void freeQueryResources(DrillClientQueryResult* pQryResult)=0;
+
+};
+
 class DrillClientQueryResult{
     friend class DrillClientImpl;
     public:
-    DrillClientQueryResult(DrillClientImpl * pClient, uint64_t coordId):
+    DrillClientQueryResult(DrillClientImpl * pClient, uint64_t coordId, const 
std::string& query):
         m_pClient(pClient),
         m_coordinationId(coordId),
+        m_query(query),
         m_numBatches(0),
         m_columnDefs(new std::vector<Drill::FieldMetadata*>),
         m_bIsQueryPending(true),
@@ -116,6 +159,7 @@ class DrillClientQueryResult{
     bool isCancelled(){return this->m_bCancel;};
     bool hasSchemaChanged(){return this->m_bHasSchemaChanged;};
     int32_t getCoordinationId(){ return this->m_coordinationId;}
+    const std::string&  getQuery(){ return this->m_query;}
 
     void setQueryId(exec::shared::QueryId* q){this->m_pQueryId=q;}
     void* getListenerContext() {return this->m_pListenerCtx;}
@@ -147,6 +191,8 @@ class DrillClientQueryResult{
     DrillClientImpl* m_pClient;
 
     int32_t m_coordinationId;
+    const std::string& m_query;
+
     size_t m_numBatches; // number of record batches received so far
 
     // Vector of Buffers holding data returned by the server
@@ -189,7 +235,7 @@ class DrillClientQueryResult{
     void * m_pListenerCtx;
 };
 
-class DrillClientImpl{
+class DrillClientImpl : public DrillClientImplBase{
     public:
         DrillClientImpl():
             m_coordinationId(1),
@@ -256,9 +302,14 @@ class DrillClientImpl{
         DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const 
std::string& plan, pfnQueryResultsListener listener, void* listenerCtx);
         void waitForResults();
         connectionStatus_t validateHandshake(DrillUserProperties* props);
+        void freeQueryResources(DrillClientQueryResult* pQryResult){
+            // Doesn't need to do anything
+            return;
+        };
 
     private:
         friend class DrillClientQueryResult;
+        friend class PooledDrillClientImpl;
 
         struct compareQueryId{
             bool operator()(const exec::shared::QueryId* q1, const 
exec::shared::QueryId* q2) const {
@@ -275,7 +326,6 @@ class DrillClientImpl{
         void handleHeartbeatTimeout(const boost::system::error_code & err); // 
send a heartbeat. If send fails, broadcast error, close connection and bail out.
 
         int32_t getNextCoordinationId(){ return ++m_coordinationId; };
-        void parseConnectStr(const char* connectStr, std::string& pathToDrill, 
std::string& protocol, std::string& hostPortStr);
         // send synchronous messages
         //connectionStatus_t recvSync(InBoundRpcMessage& msg);
         connectionStatus_t sendSync(OutBoundRpcMessage& msg);
@@ -331,6 +381,9 @@ class DrillClientImpl{
         std::string m_handshakeErrorMsg;
         bool m_bIsConnected;
 
+        std::string m_connectStr; 
+
+        // 
         // number of outstanding read requests.
         // handleRead will keep asking for more results as long as this number 
is not zero.
         size_t m_pendingRequests;
@@ -356,6 +409,8 @@ class DrillClientImpl{
         boost::asio::deadline_timer m_deadlineTimer; // to timeout async 
queries that never return
         boost::asio::deadline_timer m_heartbeatTimer; // to send heartbeat 
messages
 
+        std::string m_connectedHost; // The hostname and port the socket is 
connected to.
+
         //for synchronous messages, like validate handshake
         ByteBuf_t m_rbuf; // buffer for receiving synchronous messages
         DataBuf m_wbuf; // buffer for sending synchronous message
@@ -372,12 +427,106 @@ class DrillClientImpl{
         // Condition variable to signal completion of all queries. 
         boost::condition_variable m_cv;
 
+        bool m_bIsDirectConnection;
 };
 
 inline bool DrillClientImpl::Active() {
     return this->m_bIsConnected;;
 }
 
+
+/* *
+ *  Provides the same public interface as a DrillClientImpl but holds a pool 
of DrillClientImpls.
+ *  Every submitQuery uses a different DrillClientImpl to distribute the load.
+ *  DrillClient can use this class instead of DrillClientImpl to get better 
load balancing.
+ * */
+class PooledDrillClientImpl : public DrillClientImplBase{
+    public:
+        PooledDrillClientImpl(){
+            m_bIsDirectConnection=false;
+            m_maxConcurrentConnections = DEFAULT_MAX_CONCURRENT_CONNECTIONS;
+            char* maxConn=std::getenv(MAX_CONCURRENT_CONNECTIONS_ENV);
+            if(maxConn!=NULL){
+                m_maxConcurrentConnections=atoi(maxConn);
+            }
+            m_lastConnection=-1;
+            m_pError=NULL;
+            m_queriesExecuted=0;
+            m_pUserProperties=NULL;
+        }
+
+        ~PooledDrillClientImpl(){
+            for(std::vector<DrillClientImpl*>::iterator it = 
m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+                delete *it;
+            }
+            m_clientConnections.clear();
+            if(m_pUserProperties!=NULL){ delete m_pUserProperties; 
m_pUserProperties=NULL;}
+            if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
+        }
+
+        //Connect via Zookeeper or directly.
+        //Makes an initial connection to a drillbit. successful connect adds 
the first drillbit to the pool.
+        connectionStatus_t connect(const char* connStr);
+
+        // Test whether the client is active. Returns true if any one of the 
underlying connections is active
+        bool Active();
+
+        // Closes all open connections. 
+        void Close() ;
+
+        // Returns the last error encountered by any of the underlying 
executing queries or connections
+        DrillClientError* getError();
+
+        // Submits a query to a drillbit. If more than one query is to be 
sent, we may choose a
+        // a different drillbit in the pool. No more than 
m_maxConcurrentConnections will be allowed.
+        // Connections once added to the pool will be removed only when the 
DrillClient is closed.
+        DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const 
std::string& plan, pfnQueryResultsListener listener, void* listenerCtx);
+
+        //Waits as long as any one drillbit connection has results pending
+        void waitForResults();
+
+        //Validates handshake only against the first drillbit connected to.
+        connectionStatus_t validateHandshake(DrillUserProperties* props);
+
+        void freeQueryResources(DrillClientQueryResult* pQryResult);
+
+        int getDrillbitCount(){ return m_drillbits.size();};
+
+    private:
+        
+        std::string m_connectStr; 
+        std::string m_lastQuery;
+        
+        // A list of all the current client connections. We choose a new one 
for every query. 
+        // When picking a drillClientImpl to use, we see how many queries each 
drillClientImpl
+        // is currently executing. If none,  
+        std::vector<DrillClientImpl*> m_clientConnections; 
+               boost::mutex m_poolMutex; // protect access to the vector
+        
+        //ZookeeperImpl zook;
+        
+        // Use this to decide which drillbit to select next from the list of 
drillbits.
+        size_t m_lastConnection;
+               boost::mutex m_cMutex;
+
+        // Number of queries executed so far. Can be used to select a new 
Drillbit from the pool.
+        size_t m_queriesExecuted;
+
+        size_t m_maxConcurrentConnections;
+
+        bool m_bIsDirectConnection;
+
+        DrillClientError* m_pError;
+
+        connectionStatus_t handleConnError(connectionStatus_t status, 
std::string msg);
+        // get a connection from the pool or create a new one. Return NULL if 
none is found
+        DrillClientImpl* getOneConnection();
+
+        std::vector<std::string> m_drillbits;
+
+        DrillUserProperties* m_pUserProperties;//Keep a copy of user properties
+};
+
 class ZookeeperImpl{
     public:
         ZookeeperImpl();
@@ -385,12 +534,17 @@ class ZookeeperImpl{
         static ZooLogLevel getZkLogLevel();
         // comma separated host:port pairs, each corresponding to a zk
         // server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
-        int connectToZookeeper(const char* connectStr, const char* 
pathToDrill);
+        DEPRECATED int connectToZookeeper(const char* connectStr, const char* 
pathToDrill);
         void close();
         static void watcher(zhandle_t *zzh, int type, int state, const char 
*path, void* context);
         void debugPrint();
         std::string& getError(){return m_err;}
         const exec::DrillbitEndpoint& getEndPoint(){ return 
m_drillServiceInstance.endpoint();}
+        // return unshuffled list of drillbits
+        int getAllDrillbits(const char* connectStr, const char* pathToDrill, 
std::vector<std::string>& drillbits);
+        // picks the index drillbit and returns the corresponding endpoint 
object
+        int getEndPoint(std::vector<std::string>& drillbits, size_t index, 
exec::DrillbitEndpoint& endpoint);
+        
 
     private:
         static char s_drillRoot[];
@@ -407,6 +561,7 @@ class ZookeeperImpl{
         boost::condition_variable m_cv;
         bool m_bConnecting;
         exec::DrillServiceInstance m_drillServiceInstance;
+        std::string m_rootDir;
 };
 
 } // namespace Drill

http://git-wip-us.apache.org/repos/asf/drill/blob/df0f0af3/contrib/native/client/src/clientlib/env.h.in
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/env.h.in 
b/contrib/native/client/src/clientlib/env.h.in
new file mode 100644
index 0000000..a32f152
--- /dev/null
+++ b/contrib/native/client/src/clientlib/env.h.in
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ENV_H
+#define ENV_H
+
+#define GIT_COMMIT_PROP @GIT_COMMIT_PROP@
+
+#endif
+
+

Reply via email to