Github user parthchandra commented on a diff in the pull request:
https://github.com/apache/drill/pull/396#discussion_r55117425
--- Diff: contrib/native/client/src/clientlib/drillClientImpl.cpp ---
@@ -1392,6 +1387,198 @@ void DrillClientQueryResult::clearAndDestroy(){
}
}
+
+connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
+ connectionStatus_t stat = CONN_SUCCESS;
+ std::string pathToDrill, protocol, hostPortStr;
+ std::string host;
+ std::string port;
+ m_connectStr=connStr;
+ Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
+ if(!strcmp(protocol.c_str(), "zk")){
+ // Get a list of drillbits
+ ZookeeperImpl zook;
+ std::vector<std::string> drillbits;
+ int err = zook.getAllDrillbits(hostPortStr.c_str(),
pathToDrill.c_str(), drillbits);
+ if(!err){
+ Utils::shuffle(drillbits);
+ // The original shuffled order is maintained if we shuffle
first and then add any missing elements
+ Utils::add(m_drillbits, drillbits);
+ exec::DrillbitEndpoint e;
+ size_t nextIndex=0;
+ boost::lock_guard<boost::mutex> cLock(m_cMutex);
+ m_lastConnection++;
+ nextIndex = (m_lastConnection)%(getDrillbitCount());
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection"
+ << "(" << (void*)this << ")"
+ << ": Current counter is: "
+ << m_lastConnection << std::endl;)
+ err=zook.getEndPoint(m_drillbits, nextIndex, e);
+ if(!err){
+ host=boost::lexical_cast<std::string>(e.address());
+ port=boost::lexical_cast<std::string>(e.user_port());
+ }
+ }
+ if(err){
+ return handleConnError(CONN_ZOOKEEPER_ERROR,
getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
+ }
+ zook.close();
+ m_bIsDirectConnection=false;
+ }else if(!strcmp(protocol.c_str(), "local")){
+ char tempStr[MAX_CONNECT_STR+1];
+ strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR);
tempStr[MAX_CONNECT_STR]=0;
+ host=strtok(tempStr, ":");
+ port=strtok(NULL, "");
+ m_bIsDirectConnection=true;
+ }else{
+ return handleConnError(CONN_INVALID_INPUT,
getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
+ }
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled)
" << host << ":" << port << std::endl;)
+ DrillClientImpl* pDrillClientImpl = new DrillClientImpl();
+ stat = pDrillClientImpl->connect(host.c_str(), port.c_str());
+ if(stat == CONN_SUCCESS){
+ m_clientConnections.push_back(pDrillClientImpl);
+ }else{
+ DrillClientError* pErr = pDrillClientImpl->getError();
+ handleConnError((connectionStatus_t)pErr->status, pErr->msg);
+ delete pDrillClientImpl;
+ }
+ return stat;
+}
+
+connectionStatus_t
PooledDrillClientImpl::validateHandshake(DrillUserProperties* props){
+ // Assume there is one valid connection to at least one drillbit
+ connectionStatus_t stat=CONN_FAILURE;
+ // Keep a copy of the user properties
+ if(props!=NULL){
+ m_pUserProperties = new DrillUserProperties;
+ for(size_t i=0; i<props->size(); i++){
+ m_pUserProperties->setProperty(
+ props->keyAt(i),
+ props->valueAt(i)
+ );
+ }
+ }
+ DrillClientImpl* pDrillClientImpl = getOneConnection();
+ if(pDrillClientImpl != NULL){
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake:
(Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;)
+ stat=pDrillClientImpl->validateHandshake(m_pUserProperties);
+ }
+ else{
+ stat = handleConnError(CONN_NOTCONNECTED,
getMessage(ERR_CONN_NOCONN));
+ }
+ return stat;
+}
+
+DrillClientQueryResult*
PooledDrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const
std::string& plan, pfnQueryResultsListener listener, void* listenerCtx){
+ DrillClientQueryResult* pDrillClientQueryResult = NULL;
+ DrillClientImpl* pDrillClientImpl = NULL;
+ pDrillClientImpl = getOneConnection();
+ if(pDrillClientImpl != NULL){
+
pDrillClientQueryResult=pDrillClientImpl->SubmitQuery(t,plan,listener,listenerCtx);
+ m_queriesExecuted++;
+ }
+ return pDrillClientQueryResult;
+}
+
+void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult*
pQryResult){
+ // Nothing to do. If this class ever keeps track of executing queries
then it will need
+ // to implement this call to free any query specific resources the
pool might have
+ // allocated
+ return;
+}
+
+bool PooledDrillClientImpl::Active(){
+ for(std::vector<DrillClientImpl*>::iterator it =
m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+ if((*it)->Active()){
+ return true;
+ }
+ }
+ return false;
+}
+
+void PooledDrillClientImpl::Close() {
+ for(std::vector<DrillClientImpl*>::iterator it =
m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+ (*it)->Close();
--- End diff --
Nope. This simply shuts down the socket. the shutdown can return one of the
following errors (none of which is a problem)-
EBADF socket is not a valid file descriptor.
ENOTSOCK socket is not a socket.
ENOTCONN socket is not connected.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---