Github user sohami commented on a diff in the pull request: https://github.com/apache/drill/pull/950#discussion_r141240324 --- Diff: contrib/native/client/src/clientlib/drillClientImpl.cpp --- @@ -65,108 +66,70 @@ struct ToRpcType: public std::unary_function<google::protobuf::int32, exec::user return static_cast<exec::user::RpcType>(i); } }; -} -connectionStatus_t DrillClientImpl::connect(const char* connStr, DrillUserProperties* props){ - std::string pathToDrill, protocol, hostPortStr; - std::string host; - std::string port; +} // anonymous - if (this->m_bIsConnected) { - if(std::strcmp(connStr, m_connectStr.c_str())){ // trying to connect to a different address is not allowed if already connected +connectionStatus_t DrillClientImpl::connect(const char* connStr, DrillUserProperties* props){ + if (this->m_bIsConnected || (this->m_pChannelContext!=NULL && this->m_pChannel!=NULL)) { + if(!std::strcmp(connStr, m_connectStr.c_str())){ + // trying to connect to a different address is not allowed if already connected return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN)); } return CONN_SUCCESS; } + std::string val; + channelType_t type = ( props->isPropSet(USERPROP_USESSL) && + props->getProp(USERPROP_USESSL, val) =="true") ? + CHANNEL_TYPE_SSLSTREAM : + CHANNEL_TYPE_SOCKET; - m_connectStr=connStr; - Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr); - if(protocol == "zk"){ - ZookeeperClient zook(pathToDrill); - std::vector<std::string> drillbits; - int err = zook.getAllDrillbits(hostPortStr, drillbits); - if(!err){ - if (drillbits.empty()){ - return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_ZKNODBIT)); - } - Utils::shuffle(drillbits); - exec::DrillbitEndpoint endpoint; - err = zook.getEndPoint(drillbits[drillbits.size() -1], endpoint);// get the last one in the list - if(!err){ - host=boost::lexical_cast<std::string>(endpoint.address()); - port=boost::lexical_cast<std::string>(endpoint.user_port()); - } - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" << (drillbits.size() - 1) << ">. Selected " << endpoint.DebugString() << std::endl;) - - } - if(err){ - return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str())); - } - zook.close(); - m_bIsDirectConnection=true; - }else if(protocol == "local"){ - boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not reentrant - char tempStr[MAX_CONNECT_STR+1]; - strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0; - host=strtok(tempStr, ":"); - port=strtok(NULL, ""); - m_bIsDirectConnection=false; - }else{ - return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str())); - } - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;) - std::string serviceHost; - for (size_t i = 0; i < props->size(); i++) { - if (props->keyAt(i) == USERPROP_SERVICE_HOST) { - serviceHost = props->valueAt(i); - } + connectionStatus_t ret = CONN_SUCCESS; + m_pChannelContext = ChannelContextFactory::getChannelContext(type, props); + m_pChannel= ChannelFactory::getChannel(type, m_io_service, connStr); + ret=m_pChannel->init(m_pChannelContext); + if(ret!=CONN_SUCCESS){ + handleConnError(m_pChannel->getError()); + return ret; } - if (serviceHost.empty()) { - props->setProperty(USERPROP_SERVICE_HOST, host); + ret= m_pChannel->connect(); + if(ret!=CONN_SUCCESS){ + handleConnError(m_pChannel->getError()); + return ret; } - connectionStatus_t ret = this->connect(host.c_str(), port.c_str()); + props->setProperty(USERPROP_SERVICE_HOST, m_pChannel->getEndpoint()->getHost()); --- End diff -- we should set `this->m_bIsConnected = true` here once connection is successful.
---