Github user sudheeshkatkam commented on a diff in the pull request:

    https://github.com/apache/drill/pull/396#discussion_r55112213
  
    --- Diff: contrib/native/client/src/clientlib/drillClientImpl.cpp ---
    @@ -1392,6 +1387,198 @@ void DrillClientQueryResult::clearAndDestroy(){
         }
     }
     
    +
    +connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
    +    connectionStatus_t stat = CONN_SUCCESS;
    +    std::string pathToDrill, protocol, hostPortStr;
    +    std::string host;
    +    std::string port;
    +    m_connectStr=connStr;
    +    Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
    +    if(!strcmp(protocol.c_str(), "zk")){
    +        // Get a list of drillbits
    +        ZookeeperImpl zook;
    +        std::vector<std::string> drillbits;
    +        int err = zook.getAllDrillbits(hostPortStr.c_str(), 
pathToDrill.c_str(), drillbits);
    +        if(!err){
    +            Utils::shuffle(drillbits);
    +            // The original shuffled order is maintained if we shuffle 
first and then add any missing elements
    +            Utils::add(m_drillbits, drillbits);
    +            exec::DrillbitEndpoint e;
    +            size_t nextIndex=0;
    +            boost::lock_guard<boost::mutex> cLock(m_cMutex);
    +            m_lastConnection++;
    +            nextIndex = (m_lastConnection)%(getDrillbitCount());
    +            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection"
    +                    << "(" << (void*)this << ")"
    +                    << ": Current counter is: " 
    +                    << m_lastConnection << std::endl;)
    +                err=zook.getEndPoint(m_drillbits, nextIndex, e);
    +            if(!err){
    +                host=boost::lexical_cast<std::string>(e.address());
    +                port=boost::lexical_cast<std::string>(e.user_port());
    +            }
    +        }
    +        if(err){
    +            return handleConnError(CONN_ZOOKEEPER_ERROR, 
getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
    +        }
    +        zook.close();
    +        m_bIsDirectConnection=false;
    +    }else if(!strcmp(protocol.c_str(), "local")){
    +        char tempStr[MAX_CONNECT_STR+1];
    +        strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); 
tempStr[MAX_CONNECT_STR]=0;
    +        host=strtok(tempStr, ":");
    +        port=strtok(NULL, "");
    +        m_bIsDirectConnection=true;
    +    }else{
    +        return handleConnError(CONN_INVALID_INPUT, 
getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
    +    }
    +    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled) 
" << host << ":" << port << std::endl;)
    +        DrillClientImpl* pDrillClientImpl = new DrillClientImpl();
    +    stat =  pDrillClientImpl->connect(host.c_str(), port.c_str());
    +    if(stat == CONN_SUCCESS){
    +        m_clientConnections.push_back(pDrillClientImpl);
    +    }else{
    +        DrillClientError* pErr = pDrillClientImpl->getError();
    +        handleConnError((connectionStatus_t)pErr->status, pErr->msg);
    +        delete pDrillClientImpl;
    +    }
    +    return stat;
    +}
    +
    +connectionStatus_t 
PooledDrillClientImpl::validateHandshake(DrillUserProperties* props){
    +    // Assume there is one valid connection to at least one drillbit
    +    connectionStatus_t stat=CONN_FAILURE;
    +    // Keep a copy of the user properties
    +    if(props!=NULL){
    +        m_pUserProperties = new DrillUserProperties;
    +        for(size_t i=0; i<props->size(); i++){
    +            m_pUserProperties->setProperty(
    +                    props->keyAt(i),
    +                    props->valueAt(i)
    +                    );
    +        }
    +    }
    +    DrillClientImpl* pDrillClientImpl = getOneConnection();
    +    if(pDrillClientImpl != NULL){
    +        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: 
(Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;)
    +        stat=pDrillClientImpl->validateHandshake(m_pUserProperties);
    +    }
    +    else{
    +        stat =  handleConnError(CONN_NOTCONNECTED, 
getMessage(ERR_CONN_NOCONN));
    +    }
    +    return stat;
    +}
    +
    +DrillClientQueryResult* 
PooledDrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const 
std::string& plan, pfnQueryResultsListener listener, void* listenerCtx){
    +    DrillClientQueryResult* pDrillClientQueryResult = NULL;
    +    DrillClientImpl* pDrillClientImpl = NULL;
    +    pDrillClientImpl = getOneConnection();
    +    if(pDrillClientImpl != NULL){
    +        
pDrillClientQueryResult=pDrillClientImpl->SubmitQuery(t,plan,listener,listenerCtx);
    +        m_queriesExecuted++;
    +    }
    +    return pDrillClientQueryResult;
    +}
    +
    +void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult* 
pQryResult){
    +    // Nothing to do. If this class ever keeps track of executing queries 
then it will need 
    +    // to implement this call to free any query specific resources the 
pool might have 
    +    // allocated
    +    return;
    +}
    +
    +bool PooledDrillClientImpl::Active(){
    +    for(std::vector<DrillClientImpl*>::iterator it = 
m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
    +        if((*it)->Active()){
    +            return true;
    +        }
    +    }
    +    return false;
    +}
    +
    +void PooledDrillClientImpl::Close() {
    +    for(std::vector<DrillClientImpl*>::iterator it = 
m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
    +        (*it)->Close();
    --- End diff --
    
    Can this throw an exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to