[
https://issues.apache.org/jira/browse/DRILL-4313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15181620#comment-15181620
]
ASF GitHub Bot commented on DRILL-4313:
---------------------------------------
Github user parthchandra commented on a diff in the pull request:
https://github.com/apache/drill/pull/396#discussion_r55117425
--- Diff: contrib/native/client/src/clientlib/drillClientImpl.cpp ---
@@ -1392,6 +1387,198 @@ void DrillClientQueryResult::clearAndDestroy(){
}
}
+
+connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
+ connectionStatus_t stat = CONN_SUCCESS;
+ std::string pathToDrill, protocol, hostPortStr;
+ std::string host;
+ std::string port;
+ m_connectStr=connStr;
+ Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
+ if(!strcmp(protocol.c_str(), "zk")){
+ // Get a list of drillbits
+ ZookeeperImpl zook;
+ std::vector<std::string> drillbits;
+ int err = zook.getAllDrillbits(hostPortStr.c_str(),
pathToDrill.c_str(), drillbits);
+ if(!err){
+ Utils::shuffle(drillbits);
+ // The original shuffled order is maintained if we shuffle
first and then add any missing elements
+ Utils::add(m_drillbits, drillbits);
+ exec::DrillbitEndpoint e;
+ size_t nextIndex=0;
+ boost::lock_guard<boost::mutex> cLock(m_cMutex);
+ m_lastConnection++;
+ nextIndex = (m_lastConnection)%(getDrillbitCount());
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection"
+ << "(" << (void*)this << ")"
+ << ": Current counter is: "
+ << m_lastConnection << std::endl;)
+ err=zook.getEndPoint(m_drillbits, nextIndex, e);
+ if(!err){
+ host=boost::lexical_cast<std::string>(e.address());
+ port=boost::lexical_cast<std::string>(e.user_port());
+ }
+ }
+ if(err){
+ return handleConnError(CONN_ZOOKEEPER_ERROR,
getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
+ }
+ zook.close();
+ m_bIsDirectConnection=false;
+ }else if(!strcmp(protocol.c_str(), "local")){
+ char tempStr[MAX_CONNECT_STR+1];
+ strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR);
tempStr[MAX_CONNECT_STR]=0;
+ host=strtok(tempStr, ":");
+ port=strtok(NULL, "");
+ m_bIsDirectConnection=true;
+ }else{
+ return handleConnError(CONN_INVALID_INPUT,
getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
+ }
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled)
" << host << ":" << port << std::endl;)
+ DrillClientImpl* pDrillClientImpl = new DrillClientImpl();
+ stat = pDrillClientImpl->connect(host.c_str(), port.c_str());
+ if(stat == CONN_SUCCESS){
+ m_clientConnections.push_back(pDrillClientImpl);
+ }else{
+ DrillClientError* pErr = pDrillClientImpl->getError();
+ handleConnError((connectionStatus_t)pErr->status, pErr->msg);
+ delete pDrillClientImpl;
+ }
+ return stat;
+}
+
+connectionStatus_t
PooledDrillClientImpl::validateHandshake(DrillUserProperties* props){
+ // Assume there is one valid connection to at least one drillbit
+ connectionStatus_t stat=CONN_FAILURE;
+ // Keep a copy of the user properties
+ if(props!=NULL){
+ m_pUserProperties = new DrillUserProperties;
+ for(size_t i=0; i<props->size(); i++){
+ m_pUserProperties->setProperty(
+ props->keyAt(i),
+ props->valueAt(i)
+ );
+ }
+ }
+ DrillClientImpl* pDrillClientImpl = getOneConnection();
+ if(pDrillClientImpl != NULL){
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake:
(Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;)
+ stat=pDrillClientImpl->validateHandshake(m_pUserProperties);
+ }
+ else{
+ stat = handleConnError(CONN_NOTCONNECTED,
getMessage(ERR_CONN_NOCONN));
+ }
+ return stat;
+}
+
+DrillClientQueryResult*
PooledDrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const
std::string& plan, pfnQueryResultsListener listener, void* listenerCtx){
+ DrillClientQueryResult* pDrillClientQueryResult = NULL;
+ DrillClientImpl* pDrillClientImpl = NULL;
+ pDrillClientImpl = getOneConnection();
+ if(pDrillClientImpl != NULL){
+
pDrillClientQueryResult=pDrillClientImpl->SubmitQuery(t,plan,listener,listenerCtx);
+ m_queriesExecuted++;
+ }
+ return pDrillClientQueryResult;
+}
+
+void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult*
pQryResult){
+ // Nothing to do. If this class ever keeps track of executing queries
then it will need
+ // to implement this call to free any query specific resources the
pool might have
+ // allocated
+ return;
+}
+
+bool PooledDrillClientImpl::Active(){
+ for(std::vector<DrillClientImpl*>::iterator it =
m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+ if((*it)->Active()){
+ return true;
+ }
+ }
+ return false;
+}
+
+void PooledDrillClientImpl::Close() {
+ for(std::vector<DrillClientImpl*>::iterator it =
m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+ (*it)->Close();
--- End diff --
Nope. This simply shuts down the socket. the shutdown can return one of the
following errors (none of which is a problem)-
EBADF socket is not a valid file descriptor.
ENOTSOCK socket is not a socket.
ENOTCONN socket is not connected.
> C++ client - Improve method of drillbit selection from cluster
> --------------------------------------------------------------
>
> Key: DRILL-4313
> URL: https://issues.apache.org/jira/browse/DRILL-4313
> Project: Apache Drill
> Issue Type: Improvement
> Reporter: Parth Chandra
> Assignee: Parth Chandra
> Fix For: 1.6.0
>
>
> The current C++ client handles multiple parallel queries over the same
> connection, but that creates a bottleneck as the queries get sent to the same
> drillbit.
> The client can manage this more effectively by choosing from a configurable
> pool of connections and round robin queries to them.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)