DRILL-998: Limit amount of memory used by drill C++ client API

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5a7feb9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5a7feb9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5a7feb9d

Branch: refs/heads/master
Commit: 5a7feb9dca73bc11249210e7a241a11533f6944e
Parents: cd9eaa8
Author: Jacques Nadeau <jacq...@apache.org>
Authored: Wed Aug 27 10:31:56 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Wed Aug 27 13:33:48 2014 -0700

----------------------------------------------------------------------
 .../native/client/example/querySubmitter.cpp    |  1 +
 .../native/client/src/clientlib/CMakeLists.txt  |  1 +
 .../native/client/src/clientlib/drillClient.cpp | 11 ++--
 .../client/src/clientlib/drillClientImpl.cpp    | 50 ++++++++------
 .../client/src/clientlib/drillClientImpl.hpp    | 12 ++--
 .../native/client/src/clientlib/recordBatch.cpp |  2 +-
 contrib/native/client/src/clientlib/utils.cpp   | 68 ++++++++++++++++++++
 contrib/native/client/src/clientlib/utils.hpp   | 31 ++++++---
 .../native/client/src/include/drill/common.hpp  |  6 ++
 .../client/src/include/drill/recordBatch.hpp    |  8 +--
 10 files changed, 147 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5a7feb9d/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp 
b/contrib/native/client/example/querySubmitter.cpp
index 8e80658..17bec3c 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -280,6 +280,7 @@ int main(int argc, char* argv[]) {
         //DrillClient::initLogging("/var/log/drill/", l);
         // To log to stderr
         Drill::DrillClient::initLogging(NULL, l);
+        Drill::DrillClientConfig::setBufferLimit(2*1024*1024); // 2MB. Allows 
us to hold at least two record batches.
 
         if(client.connect(connectStr.c_str(), 
schema.c_str())!=Drill::CONN_SUCCESS){
             std::cerr<< "Failed to connect with error: "<< client.getError() 
<< " (Using:"<<connectStr<<")"<<std::endl;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5a7feb9d/contrib/native/client/src/clientlib/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/CMakeLists.txt 
b/contrib/native/client/src/clientlib/CMakeLists.txt
index 37f4734..dc8f032 100644
--- a/contrib/native/client/src/clientlib/CMakeLists.txt
+++ b/contrib/native/client/src/clientlib/CMakeLists.txt
@@ -27,6 +27,7 @@ set (CLIENTLIB_SRC_FILES
     ${CMAKE_CURRENT_SOURCE_DIR}/rpcDecoder.cpp
     ${CMAKE_CURRENT_SOURCE_DIR}/errmsgs.cpp
     ${CMAKE_CURRENT_SOURCE_DIR}/logger.cpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp
     )
 
 include_directories(${CMAKE_CURRENT_SOURCE_DIR} 
${CMAKE_CURRENT_SOURCE_DIR}/../include )

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5a7feb9d/contrib/native/client/src/clientlib/drillClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp 
b/contrib/native/client/src/clientlib/drillClient.cpp
index a7489bc..6611332 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -43,7 +43,7 @@ DrillClientInitializer::~DrillClientInitializer(){
 }
 
 logLevel_t DrillClientConfig::s_logLevel=LOG_ERROR;
-uint64_t DrillClientConfig::s_bufferLimit=-1;
+uint64_t DrillClientConfig::s_bufferLimit=MAX_MEM_ALLOC_SIZE;
 int32_t DrillClientConfig::s_socketTimeout=180;
 boost::mutex DrillClientConfig::s_mutex;
 
@@ -157,10 +157,6 @@ FieldDefPtr RecordIterator::getColDefs(){
 
 status_t RecordIterator::next(){
     status_t ret=QRY_SUCCESS;
-    this->m_pQueryResult->waitForData();
-    if(m_pQueryResult->hasError()){
-        return m_pQueryResult->getErrorStatus();
-    }
     this->m_currentRecord++;
 
     if(!this->m_pQueryResult->isCancelled()){
@@ -169,8 +165,13 @@ status_t RecordIterator::next(){
             if(this->m_pCurrentRecordBatch !=NULL){
                 DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) 
m_pCurrentRecordBatch << std::endl;
                 delete this->m_pCurrentRecordBatch; //free the previous record 
batch
+                this->m_pCurrentRecordBatch=NULL;
             }
             this->m_currentRecord=0;
+            this->m_pQueryResult->waitForData();
+            if(m_pQueryResult->hasError()){
+                return m_pQueryResult->getErrorStatus();
+            }
             this->m_pCurrentRecordBatch=this->m_pQueryResult->getNext();
             if(this->m_pCurrentRecordBatch != NULL){
                 DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << 
std::endl;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5a7feb9d/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp 
b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 54dcdd0..0ea4897 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -175,7 +175,7 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
     DRILL_LOG(LOG_DEBUG) << "Sent handshake read request to server" << 
std::endl;
     m_io_service.run();
     if(m_rbuf!=NULL){
-        Utils::freeBuffer(m_rbuf); m_rbuf=NULL;
+        Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL;
     }
     return CONN_SUCCESS;
 }
@@ -332,6 +332,13 @@ void DrillClientImpl::getNextResult(){
     // This call is always made from within a function where the mutex has 
already been acquired
     //boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
 
+    {
+        boost::unique_lock<boost::mutex> 
memLock(AllocatedBuffer::s_memCVMutex);
+        DRILL_LOG(LOG_TRACE) << "Read blocked waiting for memory." << 
std::endl;
+        while(AllocatedBuffer::s_isBufferLimitReached){
+            AllocatedBuffer::s_memCV.wait(memLock);
+        }
+    }
     //use free, not delete to free
     ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN);
 
@@ -362,10 +369,13 @@ void DrillClientImpl::waitForResults(){
     delete this->m_pListenerThread; this->m_pListenerThread=NULL;
 }
 
-status_t DrillClientImpl::readMsg(ByteBuf_t _buf, ByteBuf_t* allocatedBuffer, 
InBoundRpcMessage& msg, boost::system::error_code& error){
+status_t DrillClientImpl::readMsg(ByteBuf_t _buf, 
+        AllocatedBufferPtr* allocatedBuffer, 
+        InBoundRpcMessage& msg, 
+        boost::system::error_code& error){
     size_t leftover=0;
     uint32_t rmsgLen;
-    ByteBuf_t currentBuffer;
+    AllocatedBufferPtr currentBuffer;
     *allocatedBuffer=NULL;
     {
         // We need to protect the readLength and read buffer, and the pending 
requests counter,
@@ -379,18 +389,18 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, 
ByteBuf_t* allocatedBuffer, In
             leftover = LEN_PREFIX_BUFLEN - bytes_read;
             // Allocate a buffer
             DRILL_LOG(LOG_TRACE) << "Allocated and locked buffer." << 
std::endl;
-            currentBuffer=Utils::allocateBuffer(rmsgLen);
+            currentBuffer=new AllocatedBuffer(rmsgLen);
             if(currentBuffer==NULL){
-                Utils::freeBuffer(_buf);
+                Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
                 return handleQryError(QRY_CLIENT_OUTOFMEM, 
getMessage(ERR_QRY_OUTOFMEM), NULL);
             }
             *allocatedBuffer=currentBuffer;
             if(leftover){
-                memcpy(currentBuffer, _buf + bytes_read, leftover);
+                memcpy(currentBuffer->m_pBuffer, _buf + bytes_read, leftover);
             }
             DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : "
                 << (rmsgLen - leftover) << std::endl;
-            ByteBuf_t b=currentBuffer + leftover;
+            ByteBuf_t b=currentBuffer->m_pBuffer + leftover;
             size_t bytesToRead=rmsgLen - leftover;
             while(1){
                 size_t dataBytesRead=this->m_socket.read_some(
@@ -404,24 +414,24 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, 
ByteBuf_t* allocatedBuffer, In
             }
             if(!error){
                 // read data successfully
-                DrillClientImpl::s_decoder.Decode(currentBuffer, rmsgLen, msg);
+                DrillClientImpl::s_decoder.Decode(currentBuffer->m_pBuffer, 
rmsgLen, msg);
                 DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: 
" <<msg.m_coord_id<< std::endl;
             }else{
-                Utils::freeBuffer(_buf);
+                Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
                 return handleQryError(QRY_COMM_ERROR,
                         getMessage(ERR_QRY_COMMERR, error.message().c_str()), 
NULL);
             }
         }else{
             // got a message with an invalid read length.
-            Utils::freeBuffer(_buf);
+            Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
             return handleQryError(QRY_INTERNAL_ERROR, 
getMessage(ERR_QRY_INVREADLEN), NULL);
         }
     }
-    Utils::freeBuffer(_buf);
+    Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
     return QRY_SUCCESS;
 }
 
-status_t DrillClientImpl::processQueryResult(ByteBuf_t allocatedBuffer, 
InBoundRpcMessage& msg ){
+status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  
allocatedBuffer, InBoundRpcMessage& msg ){
     DrillClientQueryResult* pDrillClientQueryResult=NULL;
     status_t ret=QRY_SUCCESS;
     {
@@ -453,14 +463,14 @@ status_t DrillClientImpl::processQueryResult(ByteBuf_t 
allocatedBuffer, InBoundR
         //Check QueryResult.queryState. QueryResult could have an error.
         if(qr->query_state() == exec::shared::QueryResult_QueryState_FAILED){
             status_t ret=handleQryError(QRY_FAILURE, qr->error(0), 
pDrillClientQueryResult);
-            Utils::freeBuffer(allocatedBuffer);
+            delete allocatedBuffer;
             delete qr;
             return ret;
         }
         //Validate the RPC message
         std::string valErr;
         if( (ret=validateMessage(msg, *qr, valErr)) != QRY_SUCCESS){
-            Utils::freeBuffer(allocatedBuffer);
+            delete allocatedBuffer;
             delete qr;
             return handleQryError(ret, getMessage(ERR_QRY_INVRPC, 
valErr.c_str()), pDrillClientQueryResult);
         }
@@ -521,7 +531,7 @@ status_t DrillClientImpl::processQueryResult(ByteBuf_t 
allocatedBuffer, InBoundR
     return ret;
 }
 
-status_t DrillClientImpl::processQueryId(ByteBuf_t allocatedBuffer, 
InBoundRpcMessage& msg ){
+status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, 
InBoundRpcMessage& msg ){
     DrillClientQueryResult* pDrillClientQueryResult=NULL;
     DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << 
msg.m_coord_id << std::endl;
     status_t ret=QRY_SUCCESS;
@@ -539,10 +549,10 @@ status_t DrillClientImpl::processQueryId(ByteBuf_t 
allocatedBuffer, InBoundRpcMe
         //save queryId allocated here so we can free it later
         pDrillClientQueryResult->setQueryId(qid);
     }else{
-        Utils::freeBuffer(allocatedBuffer);
+        delete allocatedBuffer;
         return handleQryError(QRY_INTERNAL_ERROR, 
getMessage(ERR_QRY_INVQUERYID), NULL);
     }
-    Utils::freeBuffer(allocatedBuffer);
+    delete allocatedBuffer;
     return ret;
 }
 
@@ -580,7 +590,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
         InBoundRpcMessage msg;
 
         DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;
-        ByteBuf_t allocatedBuffer=NULL;
+        AllocatedBufferPtr allocatedBuffer=NULL;
 
         if(readMsg(_buf, &allocatedBuffer, msg, error)!=QRY_SUCCESS){
             if(m_pendingRequests!=0){
@@ -628,7 +638,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
         }
     }else{
         // boost error
-        Utils::freeBuffer(_buf);
+        Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
         handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, 
error.message().c_str()), NULL);
         return;
@@ -828,7 +838,7 @@ status_t 
DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
     return QRY_SUCCESS;
 }
 
-RecordBatch*  DrillClientQueryResult::peekNext() {
+RecordBatch*  DrillClientQueryResult::peekNext(){
     RecordBatch* pRecordBatch=NULL;
     //if no more data, return NULL;
     if(!m_bIsQueryPending) return NULL;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5a7feb9d/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp 
b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index e40b214..3ac0b20 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -198,7 +198,7 @@ class DrillClientImpl{
             m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send, 
ignorederr);
             m_socket.close();
             if(m_rbuf!=NULL){
-                Utils::freeBuffer(m_rbuf); m_rbuf=NULL;
+                Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL;
             }
             if(m_pError!=NULL){
                 delete m_pError; m_pError=NULL;
@@ -244,9 +244,13 @@ class DrillClientImpl{
         void handleHShakeReadTimeout(const boost::system::error_code & err);
         // Query results
         void getNextResult();
-        status_t readMsg(ByteBuf_t _buf, ByteBuf_t* allocatedBuffer, 
InBoundRpcMessage& msg, boost::system::error_code& error);
-        status_t processQueryResult(ByteBuf_t allocatedBuffer, 
InBoundRpcMessage& msg);
-        status_t processQueryId(ByteBuf_t allocatedBuffer, InBoundRpcMessage& 
msg );
+        status_t readMsg(
+                ByteBuf_t _buf, 
+                AllocatedBufferPtr* allocatedBuffer, 
+                InBoundRpcMessage& msg, 
+                boost::system::error_code& error);
+        status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, 
InBoundRpcMessage& msg);
+        status_t processQueryId(AllocatedBufferPtr allocatedBuffer, 
InBoundRpcMessage& msg );
         void handleReadTimeout(const boost::system::error_code & err);
         void handleRead(ByteBuf_t _buf, const boost::system::error_code & err, 
size_t bytes_transferred) ;
         status_t validateMessage(InBoundRpcMessage& msg, 
exec::shared::QueryResult& qr, std::string& valError);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5a7feb9d/contrib/native/client/src/clientlib/recordBatch.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/recordBatch.cpp 
b/contrib/native/client/src/clientlib/recordBatch.cpp
index 17073bd..4c55f04 100644
--- a/contrib/native/client/src/clientlib/recordBatch.cpp
+++ b/contrib/native/client/src/clientlib/recordBatch.cpp
@@ -312,7 +312,7 @@ RecordBatch::~RecordBatch(){
     }
     m_fieldDefs->clear();
     delete m_pQueryResult;
-    Utils::freeBuffer(m_allocatedBuffer);
+    delete m_allocatedBuffer;
 }
 
 ret_t RecordBatch::build(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5a7feb9d/contrib/native/client/src/clientlib/utils.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/utils.cpp 
b/contrib/native/client/src/clientlib/utils.cpp
new file mode 100644
index 0000000..f1f03a1
--- /dev/null
+++ b/contrib/native/client/src/clientlib/utils.cpp
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+#include "utils.hpp"
+#include "drill/common.hpp"
+
+namespace Drill{
+
+
+boost::mutex AllocatedBuffer::s_memCVMutex;
+boost::condition_variable AllocatedBuffer::s_memCV;
+size_t AllocatedBuffer::s_allocatedMem=0;
+bool AllocatedBuffer::s_isBufferLimitReached=false;
+
+ByteBuf_t Utils::allocateBuffer(size_t len){
+    boost::lock_guard<boost::mutex> memLock(AllocatedBuffer::s_memCVMutex);
+    AllocatedBuffer::s_allocatedMem+=len;
+    
//http://stackoverflow.com/questions/2688466/why-mallocmemset-is-slower-than-calloc
+    ByteBuf_t b = (ByteBuf_t)calloc(len, sizeof(Byte_t)); 
+    size_t safeSize= DrillClientConfig::getBufferLimit()-MEM_CHUNK_SIZE;
+    if(b!=NULL && AllocatedBuffer::s_allocatedMem >= safeSize){
+        AllocatedBuffer::s_isBufferLimitReached=true;
+    }
+    return b;
+}
+
+void Utils::freeBuffer(ByteBuf_t b, size_t len){ 
+    boost::lock_guard<boost::mutex> memLock(AllocatedBuffer::s_memCVMutex);
+    AllocatedBuffer::s_allocatedMem-=len;
+    free(b); 
+    size_t safeSize= DrillClientConfig::getBufferLimit()-MEM_CHUNK_SIZE;
+    if(b!=NULL && AllocatedBuffer::s_allocatedMem < safeSize){
+        AllocatedBuffer::s_isBufferLimitReached=false;
+        //signal any waiting threads
+        AllocatedBuffer::s_memCV.notify_one();
+    }
+}
+
+
+AllocatedBuffer::AllocatedBuffer(size_t l){
+    m_pBuffer=NULL;
+    m_pBuffer=Utils::allocateBuffer(l);
+    m_bufSize=m_pBuffer!=NULL?l:0;
+}
+
+AllocatedBuffer::~AllocatedBuffer(){
+    Utils::freeBuffer(m_pBuffer, m_bufSize); 
+    m_pBuffer=NULL; 
+    m_bufSize=0;
+}
+
+} // namespace 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5a7feb9d/contrib/native/client/src/clientlib/utils.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/utils.hpp 
b/contrib/native/client/src/clientlib/utils.hpp
index 9def9b4..0f26ad6 100644
--- a/contrib/native/client/src/clientlib/utils.hpp
+++ b/contrib/native/client/src/clientlib/utils.hpp
@@ -23,24 +23,37 @@
 #include <ostream>
 #include <fstream>
 #include <string>
-#include <stdlib.h>
+#include <boost/thread.hpp>
 
 #include "drill/common.hpp"
+#include "drill/drillClient.hpp"
 
 namespace Drill{
 
-class Utils{
+// Wrapper Class to keep track of allocated memory
+class AllocatedBuffer{
     public:
+    AllocatedBuffer(size_t l);
+    ~AllocatedBuffer();
 
-        //allocate memory for Record Batches
-        static ByteBuf_t allocateBuffer(size_t len){
-            
//http://stackoverflow.com/questions/2688466/why-mallocmemset-is-slower-than-calloc
-            ByteBuf_t b = (ByteBuf_t)calloc(len, sizeof(Byte_t)); return b;
-        }
-        static void freeBuffer(ByteBuf_t b){ free(b); }
+    ByteBuf_t m_pBuffer;
+    size_t    m_bufSize;
+    
+    // keep track of allocated memory. The client lib blocks
+    // if we have allocated up to a limit (defined in drillClientConfig).
+    static boost::mutex s_memCVMutex;
+    static boost::condition_variable s_memCV;
+    static size_t s_allocatedMem;
+    static bool s_isBufferLimitReached;
 
-}; // Utils
+};
 
+class Utils{
+    public:
+        //allocate memory for Record Batches
+        static ByteBuf_t allocateBuffer(size_t len);
+        static void freeBuffer(ByteBuf_t b, size_t len);
+}; // Utils
 
 } // namespace Drill
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5a7feb9d/contrib/native/client/src/include/drill/common.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/common.hpp 
b/contrib/native/client/src/include/drill/common.hpp
index 2113ce5..59734dc 100644
--- a/contrib/native/client/src/include/drill/common.hpp
+++ b/contrib/native/client/src/include/drill/common.hpp
@@ -33,6 +33,9 @@
 #define MAX_CONNECT_STR 4096
 #define MAX_SOCK_RD_BUFSIZE  1024
 
+#define MEM_CHUNK_SIZE 64*1024; // 64K
+#define MAX_MEM_ALLOC_SIZE 256*1024*1024; // 256 MB 
+
 #ifdef _DEBUG
 #define EXTRA_DEBUGGING
 #define CODER_DEBUGGING
@@ -48,6 +51,9 @@ typedef Byte_t * ByteBuf_t;
 class FieldMetadata;
 typedef boost::shared_ptr< std::vector<Drill::FieldMetadata*> > FieldDefPtr;
 
+class AllocatedBuffer;
+typedef AllocatedBuffer* AllocatedBufferPtr;
+
 typedef enum{
     QRY_SUCCESS=0,
     QRY_FAILURE=1,

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5a7feb9d/contrib/native/client/src/include/drill/recordBatch.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/recordBatch.hpp 
b/contrib/native/client/src/include/drill/recordBatch.hpp
index e9298bf..9a3df2b 100644
--- a/contrib/native/client/src/include/drill/recordBatch.hpp
+++ b/contrib/native/client/src/include/drill/recordBatch.hpp
@@ -836,10 +836,10 @@ class ValueVectorFactory{
 class DECLSPEC_DRILL_CLIENT RecordBatch{
     public:
 
-        //m_allocatedBuffer is the memory block allocated to hold the incoming 
RPC message. Record BAtches operate on
-        //slices of the allcoated buffer. The first slice (the first Field 
Batch), begins at m_buffer. Data in the
+        //m_allocatedBuffer is the memory block allocated to hold the incoming 
RPC message. Record Batches operate on
+        //slices of the allocated buffer. The first slice (the first Field 
Batch), begins at m_buffer. Data in the
         //allocated buffer before m_buffer is mostly the RPC header, and the 
QueryResult object.
-        RecordBatch(exec::shared::QueryResult* pResult, ByteBuf_t r, ByteBuf_t 
b)
+        RecordBatch(exec::shared::QueryResult* pResult, AllocatedBufferPtr r, 
ByteBuf_t b)
                 :m_fieldDefs(new(std::vector<Drill::FieldMetadata*>)){
             m_pQueryResult=pResult;
             m_pRecordBatchDef=&pResult->def();
@@ -892,7 +892,7 @@ class DECLSPEC_DRILL_CLIENT RecordBatch{
     private:
         const exec::shared::QueryResult* m_pQueryResult;
         const exec::shared::RecordBatchDef* m_pRecordBatchDef;
-        ByteBuf_t m_allocatedBuffer;
+        AllocatedBufferPtr m_allocatedBuffer;
         ByteBuf_t m_buffer;
         //build the current schema out of the field metadata
         FieldDefPtr m_fieldDefs;

Reply via email to