http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/681cad66/core/sql/exp/ExpLOBaccess.cpp
----------------------------------------------------------------------
diff --git a/core/sql/exp/ExpLOBaccess.cpp b/core/sql/exp/ExpLOBaccess.cpp
index f16d48b..217f96a 100644
--- a/core/sql/exp/ExpLOBaccess.cpp
+++ b/core/sql/exp/ExpLOBaccess.cpp
@@ -23,7 +23,7 @@
 /* -*-C++-*-
  *****************************************************************************
  *
- * File:         ex_lob.C
+ * File:         EXLOBaccess.cpp
  * Description:  class to store and retrieve LOB data.
  *               
  *               
@@ -47,25 +47,25 @@
 #include <sys/stat.h>
 #include <sys/time.h>
 
-#define SQ_USE_HDFS 1
 
-#ifdef SQ_USE_HDFS
+
+
 #include "hdfs.h"
 #include "jni.h"
-#endif 
+
 
 #include "ExpLOBstats.h"
 #include "ExpLOBaccess.h"
 #include "ExpLOBinterface.h"
-
+#include "ExpLOBexternal.h"
 #include "NAVersionedObject.h"
 #include "ComQueue.h"
 
 #include "NAMemory.h"
 #include <seabed/ms.h>
-#include <../../sqf/src/seabed/src/trans.h>
 #include <seabed/fserr.h>
 #include <curl/curl.h>
+#include <../../sqf/src/seabed/src/trans.h>
 extern int ms_transid_get(bool pv_supp,
                           bool pv_trace,
                           MS_Mon_Transid_Type *pp_transid,
@@ -79,15 +79,12 @@ ExLob::ExLob() :
     storage_(Lob_Invalid_Storage),
     dir_(string()),
     lobGlobalHeap_(NULL),
-
-    // fdDesc_(-1),
-    fdDesc_(NULL),
     fs_(NULL),
     fdData_(NULL),
     openFlags_(0)  
 {
     lobDataFile_[0] = '\0';
-    lobDescFile_[0] = '\0'; 
+    
 }
 
 ExLob::~ExLob()
@@ -97,10 +94,8 @@ ExLob::~ExLob()
       hdfsCloseFile(fs_, fdData_);
       fdData_ = NULL;
     }
-    if (fdDesc_) {
-      hdfsCloseFile(fs_, fdDesc_);
-      fdDesc_ = NULL;
-    }
+    
+    
     
     /*   
     Commenting this out. It is causing cores during hive access.
@@ -118,6 +113,7 @@ Ex_Lob_Error ExLob::initialize(char *lobFile, Ex_Lob_Mode 
mode,
                                char *dir, 
                               LobsStorage storage,
                                char *hdfsServer, Int64 hdfsPort,
+                               char *lobLocation,
                                int bufferSize , short replication ,
                                int blockSize, Int64 lobMaxSize, ExLobGlobals 
*lobGlobals)
 {
@@ -126,7 +122,7 @@ Ex_Lob_Error ExLob::initialize(char *lobFile, Ex_Lob_Mode 
mode,
   struct timespec startTime;
   struct timespec endTime;
   Int64 secs, nsecs, totalnsecs;
-
+ 
   if (dir) 
     {
       if (dir_.empty()) 
@@ -136,12 +132,12 @@ Ex_Lob_Error ExLob::initialize(char *lobFile, Ex_Lob_Mode 
mode,
 
    
       snprintf(lobDataFile_, MAX_LOB_FILE_NAME_LEN, "%s/%s", dir_.c_str(), 
lobFile);
-      snprintf(lobDescFile_, MAX_LOB_FILE_NAME_LEN, "%s/%s.desc", 
dir_.c_str(), lobFile);
+      
     } 
   else 
     { 
       snprintf(lobDataFile_, MAX_LOB_FILE_NAME_LEN, "%s", lobFile);
-      snprintf(lobDescFile_, MAX_LOB_FILE_NAME_LEN, "%s.desc", lobFile);
+      
     }
 
   if (storage_ != Lob_Invalid_Storage) 
@@ -156,7 +152,7 @@ Ex_Lob_Error ExLob::initialize(char *lobFile, Ex_Lob_Mode 
mode,
 
   hdfsServer_ = hdfsServer;
   hdfsPort_ = hdfsPort;
-
+  lobLocation_ = lobLocation;
   clock_gettime(CLOCK_MONOTONIC, &startTime);
 
   if (lobGlobals->getHdfsFs() == NULL)
@@ -200,107 +196,94 @@ Ex_Lob_Error ExLob::initialize(char *lobFile, 
Ex_Lob_Mode mode,
        }
       hdfsCloseFile(fs_, fdData_);
       fdData_ = NULL;
-      if (!lobGlobals->isHive())
-       {
-         //Create the desc header file that holds info about the 
-         //lob data file offsets etc.   
-         fdDesc_ = hdfsOpenFile(fs_, lobDescFile_, O_WRONLY, bufferSize, 
replication, blockSize);
-         if (!fdDesc_) 
-           {
-             return LOB_DESC_FILE_CREATE_ERROR;
-           }
-         //write empty header info into it. 
-         ExLobDescHeader header(lobMaxSize);
-      
-         Int64 numWritten = 0;
-         numWritten = hdfsWrite(fs_, fdDesc_, (void *)&header, 
sizeof(ExLobDescHeader));
-         if (numWritten <=0)
-           return LOB_DATA_WRITE_ERROR;
-      
-      
-         if (hdfsFlush(fs_, fdDesc_)) 
-           return LOB_DATA_FLUSH_ERROR;
-           
-         hdfsCloseFile(fs_, fdDesc_);
-         fdDesc_ = NULL;   
-       }
+     
     }
   lobGlobalHeap_ = lobGlobals->getHeap();    
   return LOB_OPER_OK;
     
 }
 
-Ex_Lob_Error ExLob::fetchCursor() 
+Ex_Lob_Error ExLob::fetchCursor(char *handleIn, Int32 handleLenIn, Int64 
&outOffset, Int64 &outSize,NABoolean &isEOD, Int64 transId) 
 {
-    Ex_Lob_Error err; 
-
-    request_.setType(Lob_Req_Fetch_Cursor);
+  Ex_Lob_Error err = LOB_OPER_OK;
+  Int64 dummyParam;
+  int cliErr=0;
+  Int64 offset = 0;
+  Int64 size = 0;
+  lobCursors_it it = lobCursors_.find(string(handleIn, handleLenIn));
 
-    err = request_.send();
+   if (it == lobCursors_.end())
+   {
+      return LOB_CURSOR_NOT_OPEN;                         
+   }
 
-    if (err != LOB_OPER_OK) {
+   void *cliInterface = it->second.cliInterface_;
+   
+   
+    cliErr = SQL_EXEC_LOBcliInterface(handleIn, handleLenIn, 
+                                    0, 0,
+                                     (char *)&dummyParam, (Lng32 *)&dummyParam,
+                                     LOB_CLI_SELECT_FETCH, LOB_CLI_ExecImmed,
+                                     &offset, &size,
+                                     &dummyParam, &dummyParam, 
+                                    &cliInterface,
+                                    transId);
+    if (err != LOB_OPER_OK) 
+      {
        return err;
-    }
+      }
+    if (cliErr == 100 )
+      {
+        isEOD= TRUE;
+        cliErr = SQL_EXEC_LOBcliInterface(handleIn, handleLenIn, 
+                                    NULL, NULL,
+                                     (char *)&dummyParam, (Lng32 *)&dummyParam,
+                                     LOB_CLI_SELECT_CLOSE, LOB_CLI_ExecImmed,
+                                     &dummyParam, &dummyParam,
+                                     &dummyParam, &dummyParam, 
+                                    &cliInterface,
+                                    transId);
+        
+      }
+    else
+      {
+        outOffset = offset;
+        outSize = size;
+      }
 
-    err = request_.getError();
+  
 
     return err;
 }
 
-Ex_Lob_Error ExLob::delDesc(Int64 descNum) 
-{
-    Ex_Lob_Error err; 
-
-    request_.setType(Lob_Req_Del_Desc);
-    request_.setDescNumIn(descNum);
 
-    err = request_.send();
-
-    if (err != LOB_OPER_OK) {
-       return err;
-    }
-
-    err = request_.getError();
-
-    return err;
-}
 
-Ex_Lob_Error ExLob::getDesc(ExLobDesc &desc) 
+Ex_Lob_Error ExLob::getDesc(ExLobDesc &desc,char * handleIn, Int32 
handleInLen, char *blackBox, Int32 *blackBoxLen, char *handleOut, Int32 
&handleOutLen, Int64 transId) 
 {
-    Ex_Lob_Error err; 
-
-    request_.setType(Lob_Req_Get_Desc);
-
-    err = request_.send();
-
-    if (err != LOB_OPER_OK) {
-       return err;
-    }
-
-    request_.getDescOut(desc);
-    err = request_.getError();
+    Ex_Lob_Error err = LOB_OPER_OK; 
+    NABoolean multipleChunks = FALSE;
+    Int32 clierr = 0;
+    Int64 size,offset,dummyParam = 0;
+   
 
+     clierr = SQL_EXEC_LOBcliInterface(handleIn, 
+                                       handleInLen, 
+                                       blackBox, blackBoxLen,
+                                       handleOut, &handleOutLen,
+                                       LOB_CLI_SELECT_UNIQUE, 
LOB_CLI_ExecImmed,
+                                       &offset, &size,
+                                       &dummyParam, &dummyParam, 
+                                       0,
+                                       transId);
+     
+     if (clierr < 0) 
+       return LOB_DESC_READ_ERROR;
+    desc.setOffset(offset);
+    desc.setSize(size);
+  
     return err;
 }
 
-Ex_Lob_Error ExLob::putDesc(ExLobDesc &desc, Int64 descNum)
-{
-    Ex_Lob_Error err; 
-
-    request_.setType(Lob_Req_Put_Desc);
-    request_.setDescNumIn(descNum);
-    request_.putDescIn(desc);
-
-    err = request_.send();
-
-    if (err != LOB_OPER_OK) {
-       return err;
-    }
-
-    err = request_.getError();
-
-    return err;
-}
 
 Ex_Lob_Error ExLob::writeData(Int64 offset, char *data, Int32 size, Int64 
&operLen)
 { 
@@ -642,10 +625,14 @@ Ex_Lob_Error ExLob::readExternalSourceFile(char *srcfile, 
char *&fileData, Int32
   return LOB_OPER_OK;
 }
 
-Ex_Lob_Error ExLob::writeDesc(Int64 &sourceLen, char *source, LobsSubOper 
subOper, Int64 &descNumOut, Int64 &operLen, Int64 lobMaxSize)
+Ex_Lob_Error ExLob::writeDesc(Int64 &sourceLen, char *source, LobsSubOper 
subOper, Int64 &descNumOut, Int64 &operLen, Int64 lobMaxSize,Int64 
lobMaxChunkMemSize,Int64 lobGCLimit, char * handleIn, Int32 handleInLen, char 
*blackBox, Int32 *blackBoxLen, char *handleOut, Int32 &handleOutLen, void 
*lobGlobals)
 {
-    Ex_Lob_Error err; 
+  Ex_Lob_Error err=LOB_OPER_OK; 
     Int64 dataOffset = 0;
+    Int64 outDescPartnKey = 0;
+    Int64 outDescSyskey = 0;
+    Int32 clierr = 0;
+
     // Calculate sourceLen for each subOper.
     if (subOper == Lob_File)
       {
@@ -656,25 +643,59 @@ Ex_Lob_Error ExLob::writeDesc(Int64 &sourceLen, char 
*source, LobsSubOper subOpe
     if (sourceLen <= 0 || sourceLen > lobMaxSize)
       {
        return LOB_MAX_LIMIT_ERROR; //exceeded the size of the max lob size
+        //TBD trigger compaction
       }
-    err = allocateDesc((unsigned int)sourceLen, descNumOut, dataOffset, 
lobMaxSize);
+    err = allocateDesc((unsigned int)sourceLen, descNumOut, dataOffset, 
lobMaxSize, lobMaxChunkMemSize,handleIn, handleInLen, lobGCLimit,lobGlobals);
 
     operLen = 0; 
     if (err != LOB_OPER_OK)
       return err;
-
-    //send a message to mxlobsrvr to insert into the descriptor tables
-    request_.setType(Lob_Req_Allocate_Desc);
-    request_.getDesc().setSize(sourceLen);
-    request_.setDescNumOut(descNumOut);
-    request_.setDataOffset(dataOffset);
-    err = request_.send();
-    if (err != LOB_OPER_OK) {
-      return err;
+    
+   clierr = SQL_EXEC_LOBcliInterface(handleIn, 
+                                     handleInLen, 
+                                    NULL, blackBoxLen,
+                                     handleOut, &handleOutLen,
+                                     LOB_CLI_INSERT, LOB_CLI_ExecImmed,
+                                     &dataOffset, &sourceLen,
+                                     &outDescPartnKey, &outDescSyskey, 
+                                    0,
+                                    0);
+    if (clierr < 0 ) {
+      return LOB_DESC_WRITE_ERROR;
     }
     return err;
 }
 
+
+Ex_Lob_Error ExLob::insertDesc(Int64 offset, Int64 size,  char *handleIn, 
Int32 handleInLen,  char *handleOut, Int32 &handleOutLen, char *blackBox, Int32 
blackBoxLen,void *lobGlobals) 
+{
+  
+   Lng32 clierr;
+   Int64 dummyParam;
+   Int64 outDescSyskey = 0;
+   Int64 outDescPartnKey = 0;
+   handleOutLen = 0;
+   Int32 chunkNum = 1;
+ 
+   NABoolean foundUnused = FALSE;
+   
+   clierr = SQL_EXEC_LOBcliInterface(handleIn, 
+                                     handleInLen, 
+                                    NULL, &chunkNum,
+                                     handleOut, &handleOutLen,
+                                     LOB_CLI_INSERT, LOB_CLI_ExecImmed,
+                                     &offset, &size,
+                                     &outDescPartnKey, &outDescSyskey, 
+                                    0,
+                                    0);
+ 
+   if (clierr < 0 ) {
+      return LOB_DESC_WRITE_ERROR;
+    }
+   return LOB_OPER_OK;
+}
+
+
 Ex_Lob_Error ExLob::writeLobData(char *source, Int64 sourceLen, LobsSubOper 
subOperation, Int64 tgtOffset,Int64 &operLen, Int64 lobMaxChunkMemSize)
 {
     Ex_Lob_Error err; 
@@ -692,8 +713,7 @@ Ex_Lob_Error ExLob::writeLobData(char *source, Int64 
sourceLen, LobsSubOper subO
            err = readSourceFile(source, inputAddr, allocMemSize, readOffset);
            if (err != LOB_OPER_OK)
              return err;     
-         } 
-   
+         }    
        else 
          { // in memory
           
@@ -723,10 +743,10 @@ Ex_Lob_Error ExLob::writeLobData(char *source, Int64 
sourceLen, LobsSubOper subO
     return err;
 }
 
-Ex_Lob_Error ExLob::readToMem(char *memAddr, Int64 size,  Int64 &operLen)
+Ex_Lob_Error ExLob::readToMem(char *memAddr, Int64 size,  Int64 &operLen,char 
* handleIn, Int32 handleInLen, char *blackBox, Int32 blackBoxLen, char * 
handleOut, Int32 &handleOutLen, Int64 transId)
 {
    Ex_Lob_Error err = LOB_OPER_OK; 
-
+   NABoolean multipleChunks = FALSE;
   
    int cliErr;
 
@@ -735,12 +755,19 @@ Ex_Lob_Error ExLob::readToMem(char *memAddr, Int64 size,  
Int64 &operLen)
    Int64 sizeToRead = 0;
    
    
-   err = getDesc(desc);
+   err = getDesc(desc,handleIn,handleInLen,blackBox, 
&blackBoxLen,handleOut,handleOutLen,transId);
+   if (err != LOB_OPER_OK)
+     {    
+       return err;
+     }
    sizeToRead = MINOF(size,desc.getSize());
    
-   if (getRequest()->getBlackBoxLen() == -1)
-     sizeToRead = size;
-   err = readDataToMem(memAddr, desc.getOffset(),sizeToRead, operLen);
+   if (blackBoxLen == -1)
+     {
+       sizeToRead = size;
+       multipleChunks = TRUE;
+     }
+   err = readDataToMem(memAddr, desc.getOffset(),sizeToRead, operLen, 
handleIn,handleInLen, multipleChunks,transId);
 
    return err;
 }
@@ -772,18 +799,20 @@ LobInputOutputFileType ExLob::fileType(char *ioFileName)
     else
       return LOCAL_FILE;
 }
-Ex_Lob_Error ExLob::readToFile(char *tgtFileName, Int64 tgtLength, Int64 
&operLen, Int64 lobMaxChunkMemLen, Int32 fileflags)
+Ex_Lob_Error ExLob::readToFile(char *tgtFileName, Int64 tgtLength, Int64 
&operLen, Int64 lobMaxChunkMemLen, Int32 fileflags,char *handleIn,Int32 
handleInLen, char *blackBox, Int32 blackBoxLen, char * handleOut, Int32 
&handleOutLen, Int64 transId)
 {
   Ex_Lob_Error err = LOB_OPER_OK; 
   Int64 srcOffset = 0;
   Int64 srcLength = 0;
   LobInputOutputFileType tgtType = fileType(tgtFileName);
   ExLobDesc desc;
-  err = getDesc(desc);
+  NABoolean multipleChunks = FALSE;
+  err = getDesc(desc,handleIn,handleInLen,blackBox, 
&blackBoxLen,handleOut,handleOutLen,transId);
   if (err != LOB_OPER_OK)
     return err;
-  if (getRequest()->getBlackBoxLen() == -1)  // mxlobsrvr returned -1 
indicating multiple chunks for this particular lob handle
+  if (blackBoxLen == -1)  // mxlobsrvr returned -1 indicating multiple chunks 
for this particular lob handle
     {
+      multipleChunks = TRUE;
       //the data retrieval in chunks is handled in readDataToMem.
     }
   else if (tgtLength <=0 )
@@ -797,19 +826,19 @@ Ex_Lob_Error ExLob::readToFile(char *tgtFileName, Int64 
tgtLength, Int64 &operLe
     }
   if (tgtType == HDFS_FILE)
     {
-      err = readDataToHdfsFile(tgtFileName,  srcOffset , tgtLength,operLen, 
lobMaxChunkMemLen, fileflags);
+      err = readDataToHdfsFile(tgtFileName,  srcOffset , tgtLength,operLen, 
lobMaxChunkMemLen, fileflags,handleIn,handleInLen,multipleChunks,transId);
       if (err != LOB_OPER_OK)
        return err;
     }
   else if(tgtType == CURL_FILE)
     {
-      err = readDataToExternalFile(tgtFileName, srcOffset, tgtLength, operLen, 
lobMaxChunkMemLen, fileflags);
+      err = readDataToExternalFile(tgtFileName, srcOffset, tgtLength, operLen, 
lobMaxChunkMemLen, fileflags,handleIn, handleInLen,multipleChunks,transId);
       if (err != LOB_OPER_OK)
        return err;
     }
   else if (tgtType == LOCAL_FILE)
     { 
-      err = readDataToLocalFile(tgtFileName,srcOffset, tgtLength,operLen, 
lobMaxChunkMemLen, fileflags);
+      err = readDataToLocalFile(tgtFileName,srcOffset, tgtLength,operLen, 
lobMaxChunkMemLen, fileflags,handleIn,handleInLen,multipleChunks,transId);
       if (err != LOB_OPER_OK)
        return err;
     }
@@ -819,12 +848,15 @@ Ex_Lob_Error ExLob::readToFile(char *tgtFileName, Int64 
tgtLength, Int64 &operLe
   return LOB_OPER_OK;
 }
 
-Ex_Lob_Error ExLob::append(char *data, Int64 size, LobsSubOper so, Int64 
headDescNum, Int64 &operLen, Int64 lobMaxSize,Int64 lobMaxChunkMemSize)
+Ex_Lob_Error ExLob::append(char *data, Int64 size, LobsSubOper so, Int64 
headDescNum, Int64 &operLen, Int64 lobMaxSize,Int64 lobMaxChunkMemSize, Int64 
lobGCLimit, char *handleIn, Int32 handleInLen,  char * handleOut, Int32 
&handleOutLen,void *lobGlobals)
 {
     Ex_Lob_Error err = LOB_OPER_OK;
     Int64 dummyParam;
     Int64 dataOffset=0;
     Int64 sourceLen = size;
+    Int32 clierr = 0;
+    Int32 chunkNum = 0;
+    Int64 outDescPartnKey, outDescSyskey = 0;
     if (so == Lob_File)
       {
        err = statSourceFile(data, sourceLen); 
@@ -835,22 +867,21 @@ Ex_Lob_Error ExLob::append(char *data, Int64 size, 
LobsSubOper so, Int64 headDes
       {
        return LOB_MAX_LIMIT_ERROR; //exceeded the size of the max lob size
       }
-    err = allocateDesc((unsigned int)sourceLen, dummyParam, dataOffset, 
lobMaxSize);
+    err = allocateDesc((unsigned int)sourceLen, dummyParam, dataOffset, 
lobMaxSize,lobMaxChunkMemSize,handleIn, handleInLen,lobGCLimit,lobGlobals);
     if (err != LOB_OPER_OK)
       return err;
-    request_.setType(Lob_Req_Append);
-    request_.getDesc().setSize(sourceLen);
-    request_.setDataOffset(dataOffset);
-    request_.send();
-
-    err = request_.getError();
-
-    if (err != LOB_OPER_OK) {
-       return err;
-    }
-
-    int cliErr = request_.getCliError();
-    if (cliErr < 0 || cliErr == 100) { // some error or EOD.
+   
+    clierr = SQL_EXEC_LOBcliInterface(handleIn, handleInLen, 
+                                     0, &chunkNum,
+                                      handleOut, &handleOutLen,
+                                      LOB_CLI_INSERT_APPEND, LOB_CLI_ExecImmed,
+                                      &dataOffset, &sourceLen,
+                                      &outDescPartnKey, &outDescSyskey, 
+                                     0,
+                                     0);
+    
+    
+    if (clierr < 0 || clierr == 100) { // some error or EOD.
        return LOB_DESC_APPEND_ERROR;
     }
 
@@ -864,20 +895,20 @@ Ex_Lob_Error ExLob::append(char *data, Int64 size, 
LobsSubOper so, Int64 headDes
       return err;
     return LOB_OPER_OK;
 }
-Ex_Lob_Error ExLob::insertData(char *data, Int64 size, LobsSubOper so,Int64 
headDescNum, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemSize)
+Ex_Lob_Error ExLob::insertData(char *data, Int64 size, LobsSubOper so,Int64 
headDescNum, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemSize,char * 
handleIn, Int32 handleInLen, char *blackBox, Int32 blackBoxLen, char * 
handleOut, Int32 &handleOutLen, void *lobGlobals)
 {
-   Ex_Lob_Error err; 
+   Ex_Lob_Error err=LOB_OPER_OK; 
    ExLobDesc desc;
-
+   int clierr = 0;
    operLen = 0;
 
    // get offset and input size from desc (the one that was just               
            inserted into the descriptor handle table)
-   err = getDesc(desc);
-   if (err != LOB_OPER_OK)
-     return err;
+  
+   err = getDesc(desc,handleIn,handleInLen,blackBox, 
&blackBoxLen,handleOut,handleOutLen,0);
+   
 
-    int cliErr = request_.getCliError();
-    if (cliErr < 0 || cliErr == 100) { // some error or EOD.
+   
+    if (err !=LOB_OPER_OK) { // some error or EOD.
        return LOB_DESC_READ_ERROR;
     }
     
@@ -898,12 +929,15 @@ Ex_Lob_Error ExLob::insertData(char *data, Int64 size, 
LobsSubOper so,Int64 head
       return err;
     return LOB_OPER_OK;
 }
-Ex_Lob_Error ExLob::update(char *data, Int64 size, LobsSubOper so,Int64 
headDescNum, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemSize)
+Ex_Lob_Error ExLob::update(char *data, Int64 size, LobsSubOper so,Int64 
headDescNum, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemSize, Int64 
lobGCLimit, char *handleIn, Int32 handleInLen,  char *handleOut, Int32 
&handleOutLen, void *lobGlobals)
 {
     Ex_Lob_Error err = LOB_OPER_OK;
     Int64 dummyParam;
     Int64 dataOffset = 0;
     Int64 sourceLen = size;
+    Int32 clierr = 0;
+    Int64 outDescPartnKey,outDescSyskey = 0;
+    Int32 chunkNum = 0;
     if (so == Lob_File)
       {
        err = statSourceFile(data, sourceLen); 
@@ -914,24 +948,21 @@ Ex_Lob_Error ExLob::update(char *data, Int64 size, 
LobsSubOper so,Int64 headDesc
       {
        return LOB_MAX_LIMIT_ERROR; //exceeded the size of the max lob size
       }
-    err = allocateDesc((unsigned int)sourceLen, dummyParam, dataOffset, 
lobMaxSize);
+    err = allocateDesc((unsigned int)sourceLen, dummyParam, dataOffset, 
lobMaxSize, lobMaxChunkMemSize, handleIn, handleInLen, lobGCLimit,lobGlobals);
     if (err != LOB_OPER_OK)
       return err;
-    // send a message to mxlobsrvr to do an update into descriptor tables
-    request_.setType(Lob_Req_Update);
-    request_.getDesc().setSize(sourceLen);
-    request_.setDataOffset(dataOffset);
-
-    request_.send();
-
-    err = request_.getError();
-
-    if (err != LOB_OPER_OK) {
-       return err;
-    }
-
-    int cliErr = request_.getCliError();
-    if (cliErr < 0 || cliErr == 100) { // some error or EOD.
+  
+    clierr = SQL_EXEC_LOBcliInterface(handleIn, 
+                                      handleInLen, 
+                                     0, &chunkNum,
+                                      handleOut, &handleOutLen,
+                                      LOB_CLI_UPDATE_UNIQUE, LOB_CLI_ExecImmed,
+                                      &dataOffset, &sourceLen,
+                                      &outDescPartnKey, &outDescSyskey, 
+                                     0,
+                                     0);
+    
+    if (clierr < 0 || clierr == 100) { // some error or EOD.
        return LOB_DESC_UPDATE_ERROR;
     }
     char *inputAddr = data;
@@ -946,18 +977,27 @@ Ex_Lob_Error ExLob::update(char *data, Int64 size, 
LobsSubOper so,Int64 headDesc
     return LOB_OPER_OK;
 }
 
-Ex_Lob_Error ExLob::delDesc()
+Ex_Lob_Error ExLob::delDesc(char *handleIn, Int32 handleInLen, Int64 transId)
 {
-    Ex_Lob_Error err;
-    Int64 dummyParam;
-
-    request_.setType(Lob_Req_Del_Desc);
+    Ex_Lob_Error err;   
+    Int64 offset=0;
+    Int64 dummyParam=0;  
+    Lng32 clierr=0;
 
-    request_.send();
-
-    err = request_.getError();
+    clierr = SQL_EXEC_LOBcliInterface(handleIn, handleInLen, 
+                                     0, 0,
+                                      (char *)&dummyParam, (Lng32 
*)&dummyParam,
+                                      LOB_CLI_DELETE, LOB_CLI_ExecImmed,
+                                      &dummyParam, &dummyParam,
+                                      &dummyParam, &dummyParam, 
+                                     0,
+                                     transId);
 
-    return err;
+    if (clierr < 0)
+      return LOB_DESC_FILE_DELETE_ERROR;
+    
+    return LOB_OPER_OK;
+ 
 }
 
 Ex_Lob_Error ExLob::purgeLob()
@@ -967,38 +1007,37 @@ Ex_Lob_Error ExLob::purgeLob()
        {
         return LOB_DATA_FILE_DELETE_ERROR;
        }
-     if (hdfsDelete(fs_, lobDescFile_, 0) != 0) 
-       {
-        return LOB_DESC_FILE_DELETE_ERROR;
-       }
-
-
+    
     return LOB_OPER_OK;
 }
 
-Ex_Lob_Error ExLob::openCursor(char *handleIn, Int64 handleInLen)
+Ex_Lob_Error ExLob::openCursor(char *handleIn, Int32 handleInLen,Int64 transId)
 {
     Ex_Lob_Error err;
     cursor_t cursor;
+    Int32 clierr;
+    Int64 dummyParam = 0;
+    void *cliInterface = NULL;
+   
+    clierr = SQL_EXEC_LOBcliInterface(handleIn, 
+                                      handleInLen,
+                                     0, 0,
+                                     (char *)&dummyParam, (Lng32 *)&dummyParam,
+                                     LOB_CLI_SELECT_CURSOR, LOB_CLI_ExecImmed,
+                                     &dummyParam, &dummyParam,
+                                     &dummyParam, &dummyParam, 
+                                    &cliInterface,
+                                    transId);
 
-    request_.setType(Lob_Req_Select_Cursor);
-
-    err = request_.send();
-
-    if (err != LOB_OPER_OK) {
-       return err;
+    if (clierr <0 ) {
+      return LOB_DESC_READ_ERROR;
     }
 
-    err = request_.getError();
-
-    if (err != LOB_OPER_OK) {
-       return err;
-    }
 
     cursor.bytesRead_ = -1;
     cursor.descOffset_ = -1;
     cursor.descSize_ = -1;
-    cursor.cliInterface_ = NULL; // used only in lob process
+    cursor.cliInterface_ = cliInterface; // used only in lob process
     cursor.eod_ = false;
     cursor.eor_ = false; 
     cursor.eol_ = false;
@@ -1101,7 +1140,7 @@ Ex_Lob_Error ExLob::openDataCursor(char *file, 
LobsCursorType type, Int64 range,
     return LOB_OPER_OK;
 }
 
-Ex_Lob_Error ExLob::readCursor(char *tgt, Int64 tgtSize, char *handleIn, Int64 
handleInLen, Int64 &operLen)
+Ex_Lob_Error ExLob::readCursor(char *tgt, Int64 tgtSize, char *handleIn, Int32 
handleInLen, Int64 &operLen,Int64 transId)
 {
     int dataOffset;
     Ex_Lob_Error result;
@@ -1127,7 +1166,7 @@ Ex_Lob_Error ExLob::readCursor(char *tgt, Int64 tgtSize, 
char *handleIn, Int64 h
        return LOB_OPER_OK;
     }
 
-    result = readCursorData(tgt, tgtSize, cursor, operLen); // increments 
cursor
+    result = readCursorData(tgt, tgtSize, cursor, operLen, 
handleIn,handleInLen,transId); // increments cursor
         
     if (result != LOB_OPER_OK)
       return result;
@@ -1137,221 +1176,38 @@ Ex_Lob_Error ExLob::readCursor(char *tgt, Int64 
tgtSize, char *handleIn, Int64 h
     return LOB_OPER_OK;
 }
 
-Ex_Lob_Error ExLob::readDataCursorSimple(char *file, char *tgt, Int64 tgtSize, 
-                                         Int64 &operLen, ExLobGlobals 
*lobGlobals)
-{
-    int dataOffset;
-    Ex_Lob_Error result = LOB_OPER_OK;
-    cursor_t *cursor;
-    ExLobCursor::bufferList_t::iterator c_it;
-    ExLobCursorBuffer *buf = NULL;
-    Int64 bytesToCopy = 0;
-    operLen = 0;
-    Int64 len;
-    char *target = tgt;
-    bool done = false;
-
-    struct timespec startTime;
-    struct timespec endTime;
 
-    lobCursorLock_.lock();
 
-    lobCursors_it it = lobCursors_.find(string(file, strlen(file)));
 
-    if (it == lobCursors_.end())
+Ex_Lob_Error ExLob::closeCursor(char *handleIn, Int32 handleInLen)
+{
+    lobCursors_it it = lobCursors_.find(string(handleIn, handleInLen));
+    if (it != lobCursors_.end())
     {
-       lobCursorLock_.unlock();
-       return LOB_CURSOR_NOT_OPEN;
+      lobCursors_.erase(it);
     }
-    else
-    {
-       cursor = &(it->second); 
-    } 
+    return LOB_OPER_OK;
+}
 
-    lobCursorLock_.unlock();
 
-    while ((operLen < tgtSize) && !done && !cursor->eol_)
-    {
-      lobGlobals->traceMessage("locking cursor",cursor,__LINE__);
-      cursor->lock_.lock();
+Ex_Lob_Error ExLob::doSanityChecks(char *dir, LobsStorage storage,
+                                   Int32 handleInLen, Int32 handleOutLen, 
+                                   Int32 blackBoxLen)
+{
 
-      // if no buffers to read and is eor or eod, we are done.
-      // else wait for prefetch thread to wake us up. 
-      if (cursor->prefetchBufList_.size() == 0) {
-        if (cursor->eor_ || cursor->eod_) {
-          done = true;
-        } else {
-          cursor->bufferMisses_++;
-         lobGlobals->traceMessage("wait on condition cursor",cursor,__LINE__);
-          cursor->lock_.wait();
-        }
-       lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__);
-        cursor->lock_.unlock();
-        continue;
-      } 
+#ifdef SQ_USE_HDFS
+    if (!fs_)
+      return LOB_HDFS_CONNECT_ERROR;
+#else
+    if (fdData_ == -1)
+      return LOB_DATA_FILE_OPEN_ERROR;
+#endif
 
-      // a buffer is available
-      c_it = cursor->prefetchBufList_.begin();
-      buf = *c_it;
-      lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__);
-      cursor->lock_.unlock();
+    if (dir_.compare(dir) != 0)
+      return LOB_DIR_NAME_ERROR;
 
-      bytesToCopy = min(buf->bytesRemaining_, tgtSize - operLen);
-      memcpy(target, buf->data_ + buf->bytesUsed_, bytesToCopy);
-      target += bytesToCopy;
-      if (bytesToCopy == buf->bytesRemaining_) { // buffer is now empty
-        buf->bytesRemaining_ = -1;
-        buf->bytesUsed_ = -1;
-        lobGlobals->postfetchBufListLock_.lock();
-        lobGlobals->postfetchBufList_.push_back(buf);
-        lobGlobals->postfetchBufListLock_.unlock();
-       lobGlobals->traceMessage("locking cursor",cursor,__LINE__);
-        cursor->lock_.lock();
-        c_it = cursor->prefetchBufList_.erase(c_it);
-       lobGlobals->traceMessage("signal condition cursor",cursor,__LINE__);
-        cursor->lock_.wakeOne(); // wake up prefetch thread if it was waiting 
for an empty buffer.
-       lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__);
-        cursor->lock_.unlock();
-      } else {
-        buf->bytesUsed_ += bytesToCopy;
-        buf->bytesRemaining_ -= bytesToCopy;
-      }
-      stats_.bytesPrefetched += bytesToCopy;
-      operLen += bytesToCopy;
-    } 
-
-    // update stats
-    stats_.bytesRead += operLen;
-    stats_.bytesToRead += tgtSize;
-    stats_.numReadReqs++;
-
-    return LOB_OPER_OK;
-}
-
-void ExLobCursor::emptyPrefetchList(ExLobGlobals *lobGlobals)
-{
-    ExLobCursor::bufferList_t::iterator c_it;
-    ExLobCursorBuffer *buf = NULL;
-
-    c_it = prefetchBufList_.begin();
-    while (c_it != prefetchBufList_.end())
-    {
-      buf = *c_it;
-      lobGlobals->postfetchBufListLock_.lock();
-      lobGlobals->postfetchBufList_.push_back(buf);
-      lobGlobals->postfetchBufListLock_.unlock();
-      c_it = prefetchBufList_.erase(c_it);
-    }
-}
-
-// Seems like this is currently unused. 
-// closeDataCusrorSimple takes care of destroying the cursor.But addign code
-// similar to closeDataCursorSimple for correctness in case it is used in 
future
-Ex_Lob_Error ExLob::deleteCursor(char *cursorName, ExLobGlobals *lobGlobals)
-{
-    cursor_t *cursor = NULL;
-
-    lobCursorLock_.lock();
-
-    lobCursors_it it = lobCursors_.find(string(cursorName, 
strlen(cursorName)));
-    if (it != lobCursors_.end())
-    {
-      cursor = &(it->second);
-      lobGlobals->traceMessage("locking cursor",cursor,__LINE__);
-      cursor->lock_.lock();
-      cursor->emptyPrefetchList(lobGlobals);
-      lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__);
-      cursor->lock_.unlock();
-      lobCursors_.erase(it);
-    }
-
-    lobCursorLock_.unlock();
-
-    return LOB_OPER_OK;
-}
-
-Ex_Lob_Error ExLob::closeCursor(char *handleIn, Int64 handleInLen)
-{
-    lobCursors_it it = lobCursors_.find(string(handleIn, handleInLen));
-    if (it != lobCursors_.end())
-    {
-      lobCursors_.erase(it);
-    }
-    return LOB_OPER_OK;
-}
-
-Ex_Lob_Error ExLob::closeDataCursorSimple(char *fileName, ExLobGlobals 
*lobGlobals)
-{
-    cursor_t *cursor = NULL;
-    Int64 secs = 0;
-    Int64 nsecs = 0;
-
-    lobCursorLock_.lock();
-
-    lobCursors_it it = lobCursors_.find(string(fileName, strlen(fileName)));
-    if (it != lobCursors_.end())
-    {
-      cursor = &(it->second);
-      lobGlobals->traceMessage("locking cursor",cursor,__LINE__);
-      cursor->lock_.lock();
-
-      clock_gettime(CLOCK_MONOTONIC, &cursor->closeTime_);
-      secs = cursor->closeTime_.tv_sec - cursor->openTime_.tv_sec;
-      nsecs = cursor->closeTime_.tv_nsec - cursor->openTime_.tv_nsec;
-
-      if (cursor->eod_ || cursor->eor_) { // prefetch thread already done,
-        cursor->emptyPrefetchList(lobGlobals);
-       lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__);
-       cursor->lock_.unlock();
-        lobCursors_.erase(it);            // so erase it here. 
-        // no need to unlock as cursor object is gone.
-      } else {
-        cursor->eol_ = true;     // prefetch thread will do the eol rituals
-       lobGlobals->traceMessage("signal condition cursor",cursor,__LINE__);
-        cursor->lock_.wakeOne(); // wakeup prefetch thread
-       lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__);
-        cursor->lock_.unlock();
-      }
-    }
-
-    lobCursorLock_.unlock();
-
-    if (nsecs < 0) {
-      secs--;
-      nsecs += NUM_NSECS_IN_SEC;
-    }
-    Int64 totalnsecs = (secs * NUM_NSECS_IN_SEC) + nsecs;
-    stats_.cursorElapsedTime += totalnsecs;
-
-    return LOB_OPER_OK;
-}
-
-Ex_Lob_Error ExLob::print()
-{
-    Ex_Lob_Error err;
-    request_.setType(Lob_Req_Print);
-    err = request_.send();
-    return err;
-}
-
-Ex_Lob_Error ExLob::doSanityChecks(char *dir, LobsStorage storage,
-                                   Int64 handleInLen, Int64 handleOutLen, 
-                                   Int64 blackBoxLen)
-{
-
-#ifdef SQ_USE_HDFS
-    if (!fs_)
-      return LOB_HDFS_CONNECT_ERROR;
-#else
-    if (fdData_ == -1)
-      return LOB_DATA_FILE_OPEN_ERROR;
-#endif
-
-    if (dir_.compare(dir) != 0)
-      return LOB_DIR_NAME_ERROR;
-
-    if (storage_ != storage)
-      return LOB_STORAGE_TYPE_ERROR;
+    if (storage_ != storage)
+      return LOB_STORAGE_TYPE_ERROR;
 
     if (handleInLen > MAX_HANDLE_IN_LEN) {
       return LOB_HANDLE_IN_LEN_ERROR;
@@ -1368,75 +1224,199 @@ Ex_Lob_Error ExLob::doSanityChecks(char *dir, 
LobsStorage storage,
     return LOB_OPER_OK;
 }
 
-
-
-Ex_Lob_Error ExLob::allocateDesc(ULng32 size, Int64 &descNum, Int64 
&dataOffset, Int64 lobMaxSize)
+Ex_Lob_Error ExLob::allocateDesc(ULng32 size, Int64 &descNum, Int64 
&dataOffset, Int64 lobMaxSize, Int64 lobMaxChunkMemLen, char *handleIn, Int32 
handleInLen, Int64 lobGCLimit, void *lobGlobals)
 {
+  NABoolean GCDone = FALSE;
     Ex_Lob_Error err = LOB_OPER_OK;
     Lng32 retval = 0;
     Int64 numRead = 0;
     Int64 numWritten = 0;
-
-    // TBD need a way to lock access to this file.    
+    dataOffset = 0;
+    Int64 dummyParam = 0;
+    if (size > lobMaxSize)
+      return LOB_MAX_LIMIT_ERROR;
     
     Int32 openFlags = O_RDONLY ;   
-    fdDesc_ = hdfsOpenFile(fs_, lobDescFile_, O_RDONLY, 0, 0,0);
-      if (!fdDesc_) {
-       hdfsCloseFile(fs_,fdDesc_);
-       fdDesc_ = NULL;
-       return LOB_DESC_FILE_OPEN_ERROR;
-      }
-      ExLobDescHeader header(lobMaxSize);
-    numRead = hdfsPread(fs_,fdDesc_, 0, (void *)&header, 
sizeof(ExLobDescHeader) );
-    if (numRead <=0)
-      {
-       return LOB_DESC_HEADER_READ_ERROR;
-      }
-    if (header.getAvailSize() >= size) {
-      descNum = header.getFreeDesc(); 
-
-      dataOffset = header.getDataOffset();
-      header.incFreeDesc();
-      header.decAvailSize(size);
-      header.incDataOffset(size);
-
-      hdfsCloseFile(fs_,fdDesc_);
-      fdDesc_ = NULL;
-      openFlags = O_WRONLY;
-      fdDesc_ = hdfsOpenFile(fs_,lobDescFile_,openFlags,0,0,0);
-      if (!fdDesc_) {
-       
-       return LOB_DESC_FILE_OPEN_ERROR;
-      }
-      numWritten = hdfsWrite(fs_,fdDesc_, (void *)&header, 
sizeof(ExLobDescHeader)) ;
-      if (numWritten <= 0)
-       {
-         return LOB_DESC_HEADER_WRITE_ERROR;
+    fdData_ = hdfsOpenFile(fs_, lobDataFile_, O_RDONLY, 0, 0,0);
+      if (!fdData_) {
+       hdfsCloseFile(fs_,fdData_);
+       fdData_ = NULL;
+       return LOB_DATA_FILE_OPEN_ERROR;
       }
+      hdfsFileInfo *fInfo = hdfsGetPathInfo(fs_, lobDataFile_);
+       if (fInfo)
+         dataOffset = fInfo->mSize;
+
+      if (dataOffset > lobGCLimit) // 5 GB default
+         {
+           GCDone = TRUE;
+           /* Int32 rc = SQL_EXEC_LOBcliInterface(handleIn, handleInLen,
+                                               0,0,
+                                               (char *)dummyParam, (Lng32 
*)&dummyParam,
+                                               LOB_CLI_PERFORM_LOB_GC, 
LOB_CLI_ExecImmed,
+                                               &dummyParam, &dummyParam,
+                                               &dummyParam, &dummyParam, 
+                                               NULL,
+                                               0); //don't pass transid */
+           Int32 rc = 
SQL_EXEC_LOB_GC_Interface(lobGlobals,handleIn,handleInLen,
+                                                hdfsServer_,hdfsPort_,
+                                                lobLocation_,
+                                                lobMaxChunkMemLen);
+           
+         }
+      if (GCDone) // recalculate the new offset 
+        fInfo = hdfsGetPathInfo(fs_, lobDataFile_);
+      if (fInfo)
+        dataOffset = fInfo->mSize;
 
-      
+
+      //Find the last offset in the file
+      // dataOffset = hdfsTell(fs_,fdData_);  //commenting out.hdfsTell always 
returns 0 !!
+     
+      return LOB_OPER_OK;    
+}
+Ex_Lob_Error ExLob::compactLobDataFile(ExLobInMemoryDescChunksEntry 
*dcArray,Int32 numEntries)
+{
+  Ex_Lob_Error rc = LOB_OPER_OK;
+  Int64 maxMemChunk = 1024*1024*1024; //1GB limit for intermediate buffer for 
transfering data
+  char * saveLobDataFile = new(getLobGlobalHeap()) 
char[MAX_LOB_FILE_NAME_LEN+6];
+  str_sprintf(saveLobDataFile, "%s_save",lobDataFile_);
+  char * tmpLobDataFile = new(getLobGlobalHeap()) 
char[MAX_LOB_FILE_NAME_LEN+5];
+  str_sprintf(tmpLobDataFile, "%s_tmp",lobDataFile_);
+
+  hdfsFS fs = hdfsConnect(hdfsServer_,hdfsPort_);
+  if (fs == NULL)
+    return LOB_DATA_FILE_OPEN_ERROR;
+  
+ 
+  hdfsFile  fdData = hdfsOpenFile(fs, lobDataFile_, O_RDONLY, 0, 0,0);
+  if (!fdData) 
+    {
+      hdfsCloseFile(fs,fdData);
+      fdData = NULL;
+      return LOB_DATA_FILE_OPEN_ERROR;
     }
-    else {
-      return LOB_DATA_FILE_FULL_ERROR;
+  
+  hdfsFile fdTemp = hdfsOpenFile(fs, tmpLobDataFile,O_WRONLY|O_CREAT,0,0,0);
+   if (!fdTemp) 
+    {
+      hdfsCloseFile(fs,fdTemp);
+      fdTemp = NULL;
+      return LOB_DATA_FILE_OPEN_ERROR;
     }
-    ExLobDesc desc(dataOffset, size, descNum);
 
-      hdfsCloseFile(fs_,fdDesc_);
-      fdDesc_=NULL;
-      openFlags = O_WRONLY| O_APPEND;
-      fdDesc_ = hdfsOpenFile(fs_,lobDescFile_,openFlags,0,0,0);
-      numWritten = hdfsWrite(fs_,fdDesc_, (void *)&desc, sizeof(ExLobDesc));
-      if (numWritten <= 0)
-       {
-         err = LOB_DESC_WRITE_ERROR;
-       }
-      hdfsCloseFile(fs_,fdDesc_);
-      fdDesc_=NULL;
-     
-      // TBD need a way to unlock this hdfs file.
-    return err;
+   Int32 i = 0;
+   Int64 bytesRead = 0;
+   Int64 bytesWritten = 0;
+   Int64 size = 0;
+   Int64 chunkLen = 0;
+   char * tgt = NULL;
+   while (i < numEntries)
+     {
+       chunkLen = dcArray[i].getChunkLen();
+       if (chunkLen > maxMemChunk)
+         {
+           tgt = (char *)(getLobGlobalHeap())->allocateMemory(maxMemChunk);
+           while (chunkLen > maxMemChunk)
+             {             
+               bytesRead = 
hdfsPread(fs,fdData,dcArray[i].getCurrentOffset(),tgt,maxMemChunk);
+               if (bytesRead != maxMemChunk)
+                 {
+                   getLobGlobalHeap()->deallocateMemory(tgt);
+                   return LOB_DATA_READ_ERROR;
+                 }
+               bytesWritten = hdfsWrite(fs,fdTemp, tgt,maxMemChunk);
+               if (bytesWritten != size)
+                 {
+                   getLobGlobalHeap()->deallocateMemory(tgt);
+                   return LOB_TARGET_FILE_WRITE_ERROR;
+                 }
+               chunkLen -= maxMemChunk;
+             }
+          
+         }
+       else
+         {
+           tgt = (char *)(getLobGlobalHeap())->allocateMemory(chunkLen);
+            bytesRead = 
hdfsPread(fs,fdData,dcArray[i].getCurrentOffset(),tgt,chunkLen);
+               if (bytesRead != chunkLen)
+                 {
+                   getLobGlobalHeap()->deallocateMemory(tgt);
+                   return LOB_DATA_READ_ERROR;
+                 }
+               bytesWritten = hdfsWrite(fs,fdTemp, tgt,chunkLen);
+               if (bytesWritten != chunkLen)
+                 {
+                   getLobGlobalHeap()->deallocateMemory(tgt);
+                   return LOB_TARGET_FILE_WRITE_ERROR;
+                 }
+         }
+       if (hdfsFlush(fs, fdTemp)) {
+         return LOB_DATA_FLUSH_ERROR;
+       }
+       getLobGlobalHeap()->deallocateMemory(tgt);
+       i++;
+     }
+   hdfsCloseFile(fs,fdTemp);
+ hdfsCloseFile(fs,fdData);
+  
+   //Now save the data file and rename the tempfile to the original datafile
+
+   Int32 rc2 = hdfsRename(fs,lobDataFile_,saveLobDataFile);
+   if (rc2 == -1)
+     {
+       NADELETEBASIC(saveLobDataFile,getLobGlobalHeap());
+       NADELETEBASIC(tmpLobDataFile,getLobGlobalHeap());
+       return LOB_DATA_FILE_WRITE_ERROR;
+     }
+   rc2 = hdfsRename(fs,tmpLobDataFile, lobDataFile_);
+   if (rc2 == -1)
+     {
+       NADELETEBASIC(saveLobDataFile,getLobGlobalHeap());
+       NADELETEBASIC(tmpLobDataFile,getLobGlobalHeap());
+       return LOB_DATA_FILE_WRITE_ERROR;
+     }
+   NADELETEBASIC(saveLobDataFile,getLobGlobalHeap());
+   NADELETEBASIC(tmpLobDataFile,getLobGlobalHeap());
+   return LOB_OPER_OK;
 }
 
+Ex_Lob_Error ExLob::restoreLobDataFile()
+{
+  Ex_Lob_Error rc = LOB_OPER_OK;
+  
+  
+  hdfsFS fs = hdfsConnect(hdfsServer_,hdfsPort_);
+  if (fs == NULL)
+    return LOB_DATA_FILE_OPEN_ERROR;
+   char * saveLobDataFile = new(getLobGlobalHeap()) 
char[MAX_LOB_FILE_NAME_LEN+6];
+   str_sprintf(saveLobDataFile, "%s_save",lobDataFile_);
+   Int32 rc2 = hdfsDelete(fs,lobDataFile_,FALSE);//ok to ignore error.
+   rc2 = hdfsRename(fs,saveLobDataFile, lobDataFile_);
+   if (rc2)
+     {
+       NADELETEBASIC(saveLobDataFile,getLobGlobalHeap());
+       return LOB_OPER_ERROR; 
+     }
+   NADELETEBASIC(saveLobDataFile,getLobGlobalHeap());
+   return rc;
+
+} 
+
+Ex_Lob_Error ExLob::purgeBackupLobDataFile()
+{
+  Ex_Lob_Error rc = LOB_OPER_OK;
+ 
+  hdfsFS fs = hdfsConnect(hdfsServer_,hdfsPort_);
+  if (fs == NULL)
+    return LOB_DATA_FILE_OPEN_ERROR;
+   char * saveLobDataFile = new(getLobGlobalHeap()) 
char[MAX_LOB_FILE_NAME_LEN+6];
+   str_sprintf(saveLobDataFile, "%s_save",lobDataFile_);
+   Int32 rc2 = hdfsDelete(fs,saveLobDataFile,FALSE);//ok to ignore error.
+   
+   NADELETEBASIC(saveLobDataFile,getLobGlobalHeap());
+   return rc;
+}
 ///////////////////////////////////////////////////////////////////////////////
 // ExLobDescHeader definitions
 ///////////////////////////////////////////////////////////////////////////////
@@ -1471,7 +1451,7 @@ ExLobDesc::~ExLobDesc()
 {
 }
 
-Ex_Lob_Error ExLob::readCursorData(char *tgt, Int64 tgtSize, cursor_t &cursor, 
Int64 &operLen)
+Ex_Lob_Error ExLob::readCursorData(char *tgt, Int64 tgtSize, cursor_t &cursor, 
Int64 &operLen, char *handleIn, Int32 handleLenIn, Int64 transId)
 {
    ExLobDesc desc;
    Ex_Lob_Error err;
@@ -1482,23 +1462,26 @@ Ex_Lob_Error ExLob::readCursorData(char *tgt, Int64 
tgtSize, cursor_t &cursor, I
    tOffset offset; 
    struct timespec startTime; 
    struct timespec endTime;
+   NABoolean isEOD=FALSE;
+   Int64 outOffset = 0;
+   Int64 outSize = 0;
 
    while ( (operLen < tgtSize) && !cursor.eod_ )
    {
     
       if (cursor.bytesRead_ == cursor.descSize_) // time to read next chunck
       {
-         err = fetchCursor();
+        err = fetchCursor(handleIn, handleLenIn,outOffset, 
outSize,isEOD,transId);
          if (err != LOB_OPER_OK) {
             return err;
          }
 
-         if (request_.getCliError() == 100) {
+         if (isEOD) {
             cursor.eod_ = true; // subsequent call will return 100 and close 
the cursor
             continue;
          } else {
-            cursor.descSize_ = request_.getDesc().getSize();
-            cursor.descOffset_ = request_.getDesc().getOffset();
+            cursor.descSize_ = outSize;
+            cursor.descOffset_ = outOffset;
             cursor.bytesRead_ = 0;
          }
       }
@@ -1538,7 +1521,7 @@ Ex_Lob_Error ExLob::readCursorData(char *tgt, Int64 
tgtSize, cursor_t &cursor, I
       if (bytesRead == -1) {
          return LOB_DATA_READ_ERROR;
       } else if (bytesRead == 0) {
-         cursor.eod_ = true; 
+         cursor.eod_ = true;         
          continue;
       }
 
@@ -1552,92 +1535,29 @@ Ex_Lob_Error ExLob::readCursorData(char *tgt, Int64 
tgtSize, cursor_t &cursor, I
    return LOB_OPER_OK;
 }
 
-Ex_Lob_Error ExLob::readCursorDataSimple(char *tgt, Int64 tgtSize, cursor_t 
&cursor, Int64 &operLen)
-{
-   ExLobDesc desc;
-   Ex_Lob_Error err;
-   Int64 bytesAvailable = 0;
-   Int64 bytesToCopy = 0;
-   Int64 bytesRead = 0;
-   operLen = 0;
-   tOffset offset; 
-   struct timespec startTime; 
-   struct timespec endTime;
-   bool done = false;
-
-   if (!fdData_) {
-      return LOB_CURSOR_NOT_OPEN_ERROR;
-   }
-
-   if (cursor.bytesRead_ == -1) {  // starting
-      cursor.bytesRead_ = 0;
-   }
-
-   clock_gettime(CLOCK_MONOTONIC, &startTime);
-   
-   while ( (operLen < tgtSize) && !done )
-   {
-      //offset = cursor.descOffset_ + cursor.bytesRead_;
-      bytesToCopy = tgtSize - operLen;
-      offset = cursor.descOffset_ + cursor.bytesRead_;
-
-      // gets chunks of 64KB. Uses readDirect internally.
-      // bytesRead = hdfsPread(fs_, fdData_, offset, tgt, bytesToCopy);
-      bytesRead = hdfsRead(fs_, fdData_, tgt, bytesToCopy);
-
-      stats_.numHdfsReqs++;
-
-      if (bytesRead == -1) {
-         return LOB_DATA_READ_ERROR;
-      } else if (bytesRead == 0) {
-         done = true; 
-      }
-
-      cursor.bytesRead_ += bytesRead;
-      operLen += bytesRead;
-      tgt += bytesRead;
-   }
-
-   clock_gettime(CLOCK_MONOTONIC, &endTime);
-
-   Int64 secs = endTime.tv_sec - startTime.tv_sec;
-   Int64 nsecs = endTime.tv_nsec - startTime.tv_nsec;
-   if (nsecs < 0) {
-    secs--; 
-    nsecs += NUM_NSECS_IN_SEC;
-   }
-
-   Int64 totalnsecs = (secs * NUM_NSECS_IN_SEC) + nsecs;
-   stats_.CumulativeReadTime += totalnsecs;
 
-   return LOB_OPER_OK;
-}
 
 Ex_Lob_Error ExLob::readDataToMem(char *memAddr,
-                                  Int64 offset, Int64 size, Int64 &operLen)
+                                  Int64 offset, Int64 size, Int64 &operLen,
+                                  char *handleIn, Int32 handleLenIn, 
+                                  NABoolean multipleChunks, Int64 transId)
 
 { 
   Ex_Lob_Error err = LOB_OPER_OK;
   operLen = 0;
   Int64 bytesRead = 0;
-  NABoolean multipleChunks = FALSE;
  
-  if (getRequest()->getBlackBoxLen() == -1) // mxlobsrvr returned -1 
indicating multiple chunks for this particular lob handle
+  if (multipleChunks) 
     {
-      multipleChunks = TRUE;
      
-      err = openCursor(getRequest()->getHandleIn(), 
-                      getRequest()->getHandleInLen());
+      err = openCursor(handleIn, 
+                      handleLenIn,transId);
       //now we can fetch the descriptors for each chunk
     }
-  else
-    if (err != LOB_OPER_OK)
-      return err;
    
-  int cliErr = request_.getCliError();
-  if (cliErr < 0 || cliErr == 100) {
-    return LOB_DESC_READ_ERROR;
-  }
+  if (err != LOB_OPER_OK)
+    return err;
+  
 
   if (fdData_)// we may have a stale handle. close and open to refresh 
     {
@@ -1665,22 +1585,19 @@ Ex_Lob_Error ExLob::readDataToMem(char *memAddr,
                                 memAddr, size)) == -1) {
          
        return LOB_DATA_READ_ERROR;
-      }
-
-      
+      }      
       operLen = bytesRead;
       return LOB_OPER_OK;
     }
   else
     {
       //handle reading the multiple chunks like a cursor
-      err = readCursor(memAddr,size, getRequest()->getHandleIn(),
-                      getRequest()->getHandleInLen(), operLen);
-        
-        
+      err = readCursor(memAddr,size, handleIn,
+                      handleLenIn, operLen, transId);
+                
       if (err==LOB_OPER_OK)
-       closeCursor(getRequest()->getHandleIn(), 
-                   getRequest()->getHandleInLen());
+       closeCursor(handleIn, 
+                   handleLenIn);
       else
        return err;
     }
@@ -1689,7 +1606,7 @@ Ex_Lob_Error ExLob::readDataToMem(char *memAddr,
 
  
 
-Ex_Lob_Error ExLob::readDataToLocalFile(char *fileName,  Int64 offset, Int64 
size, Int64 &writeOperLen, Int64 lobMaxChunkMemSize, Int32 fileflags)
+Ex_Lob_Error ExLob::readDataToLocalFile(char *fileName,  Int64 offset, Int64 
size, Int64 &writeOperLen, Int64 lobMaxChunkMemSize, Int32 fileflags,char 
*handleIn,Int32 handleInLen, NABoolean multipleChunks,Int64 transId)
 { 
     Ex_Lob_Error err;
     Int64 operLen = 0;
@@ -1699,10 +1616,10 @@ Ex_Lob_Error ExLob::readDataToLocalFile(char *fileName, 
 Int64 offset, Int64 siz
     Int64 tgtOffset = 0;
     char *lobData = 0;
     Int64 chunkSize = 0;
-
+    
     if (srcLen <=0)
        return LOB_SOURCE_DATA_ALLOC_ERROR;
-    // open the targte file for writing 
+    // open the target file for writing 
     int filePerms = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
     int openFlags = O_RDWR ; // O_DIRECT needs mem alignment
     if (((LobTgtFileFlags)fileflags == Lob_Append_Or_Error ) ||
@@ -1731,15 +1648,16 @@ Ex_Lob_Error ExLob::readDataToLocalFile(char *fileName, 
 Int64 offset, Int64 siz
              return LOB_TARGET_FILE_OPEN_ERROR; 
          }
       }
-    if ((srcLen < lobMaxChunkMemSize) && (getRequest()->getBlackBoxLen() != 
-1)) // simple single I/O case
+    if ((srcLen < lobMaxChunkMemSize) && (multipleChunks ==FALSE)) // simple 
single I/O case
       {
+       
        lobData = (char *) (getLobGlobalHeap())->allocateMemory(srcLen);
 
        if (lobData == NULL) 
          {
            return LOB_SOURCE_DATA_ALLOC_ERROR;
          }
-       err = readDataToMem(lobData, srcOffset,srcLen,operLen);
+       err = readDataToMem(lobData, srcOffset,srcLen,operLen, 
handleIn,handleInLen, multipleChunks,transId);
        if (err != LOB_OPER_OK)
          {
            getLobGlobalHeap()->deallocateMemory(lobData);
@@ -1756,8 +1674,8 @@ Ex_Lob_Error ExLob::readDataToLocalFile(char *fileName,  
Int64 offset, Int64 siz
       }
     else // multiple chunks to read
       {
-       err = openCursor(getRequest()->getHandleIn(), 
-                    getRequest()->getHandleInLen());
+       err = openCursor(handleIn, 
+                         handleInLen,transId);
        if (err != LOB_OPER_OK)
          return err;
        while ( srcLen > 0)
@@ -1771,8 +1689,8 @@ Ex_Lob_Error ExLob::readDataToLocalFile(char *fileName,  
Int64 offset, Int64 siz
                return LOB_SOURCE_DATA_ALLOC_ERROR;
              }
            //handle reading the multiple chunks like a cursor
-           err = readCursor(lobData,chunkSize, getRequest()->getHandleIn(),
-                            getRequest()->getHandleInLen(), operLen);
+           err = readCursor(lobData,chunkSize, handleIn,
+                            handleInLen, operLen, transId);
 
            if ((err != LOB_OPER_OK) || (operLen != chunkSize))
              {
@@ -1790,15 +1708,15 @@ Ex_Lob_Error ExLob::readDataToLocalFile(char *fileName, 
 Int64 offset, Int64 siz
            srcLen -= chunkSize;
            tgtOffset += chunkSize;     
          }
-       closeCursor(getRequest()->getHandleIn(), 
-                   getRequest()->getHandleInLen());
+       closeCursor(handleIn, 
+                   handleInLen);
       }
     close(fdDestFile);
     return LOB_OPER_OK;
 }
 
 
-Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName,  Int64 offset, Int64 
size, Int64 &writeOperLen, Int64 lobMaxChunkMemLen, Int32 fileflags)
+Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName,  Int64 offset, Int64 
size, Int64 &writeOperLen, Int64 lobMaxChunkMemLen, Int32 fileflags,char 
*handleIn, Int32 handleInLen, NABoolean multipleChunks,Int64 transId)
 { 
   Ex_Lob_Error err;
   Int64 operLen = 0;
@@ -1809,6 +1727,7 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, 
 Int64 offset, Int64 s
   char *lobData = 0;
   Int64 chunkSize = 0; 
   hdfsFile  fdTgtFile;
+  
   // open and write to the target file
   int openFlags = O_WRONLY;
   if ((LobTgtFileFlags)fileflags == Lob_Append_Or_Error )
@@ -1844,7 +1763,7 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, 
 Int64 offset, Int64 s
        }
     } 
 
-  if ((srcLen < lobMaxChunkMemLen) && (getRequest()->getBlackBoxLen() != -1)) 
// simple single I/O case
+  if ((srcLen < lobMaxChunkMemLen) && (multipleChunks ==FALSE)) // simple 
single I/O case
     {
       lobData = (char *) (getLobGlobalHeap())->allocateMemory(srcLen);
 
@@ -1852,7 +1771,7 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, 
 Int64 offset, Int64 s
        {
          return LOB_SOURCE_DATA_ALLOC_ERROR;
        }
-      err = readDataToMem(lobData, srcOffset,srcLen,operLen);
+      err = readDataToMem(lobData, 
srcOffset,srcLen,operLen,handleIn,handleInLen, multipleChunks,transId);
       if (err != LOB_OPER_OK)
        {
          getLobGlobalHeap()->deallocateMemory(lobData);
@@ -1874,8 +1793,9 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, 
 Int64 offset, Int64 s
     }
   else
     {// multiple chunks to read
-      err = openCursor(getRequest()->getHandleIn(), 
-                      getRequest()->getHandleInLen());
+      err = openCursor(handleIn, 
+                      handleInLen,
+                       transId);
       if (err != LOB_OPER_OK)
        return err;
       while ( srcLen > 0)
@@ -1888,13 +1808,13 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char 
*tgtFileName,  Int64 offset, Int64 s
              return LOB_SOURCE_DATA_ALLOC_ERROR;
            }
          //handle reading the multiple chunks like a cursor
-         err = readCursor(lobData,chunkSize, getRequest()->getHandleIn(),
-                          getRequest()->getHandleInLen(), operLen);
+         err = readCursor(lobData,chunkSize, handleIn,
+                          handleInLen, operLen, transId);
 
          if ((err != LOB_OPER_OK) || (operLen != chunkSize))
            {
              getLobGlobalHeap()->deallocateMemory(lobData);
-             return err;
+             return LOB_DATA_READ_ERROR;
            }
          writeOperLen += hdfsWrite(fs_,fdTgtFile,lobData, chunkSize);
          if (writeOperLen <= 0)
@@ -1911,8 +1831,8 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, 
 Int64 offset, Int64 s
          srcLen -= chunkSize;
 
        }
-      closeCursor(getRequest()->getHandleIn(), 
-                 getRequest()->getHandleInLen());          
+      closeCursor(handleIn, 
+                 handleInLen);     
     }
   hdfsCloseFile(fs_, fdTgtFile);
   fdTgtFile=NULL;
@@ -1925,7 +1845,7 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, 
 Int64 offset, Int64 s
 
 
 
-Ex_Lob_Error ExLob::readDataToExternalFile(char *tgtFileName,  Int64 offset, 
Int64 size, Int64 &operLen,Int64 lobMaxChunkMemLen,Int32 fileflags)
+Ex_Lob_Error ExLob::readDataToExternalFile(char *tgtFileName,  Int64 offset, 
Int64 size, Int64 &operLen,Int64 lobMaxChunkMemLen,Int32 fileflags,char 
*handleIn,Int32 handleInLen, NABoolean multipleChunks,Int64 transId)
 { 
   //TBD
   return LOB_OPER_OK;
@@ -1954,975 +1874,1210 @@ Ex_Lob_Error ExLob::initStats()
     stats_.init();
     return LOB_OPER_OK;
 }
-void ExLobGlobals::traceMessage(const char *logMessage, ExLobCursor *cursor,
-                                int line)
-{
-  if ( threadTraceFile_ && logMessage)
-  {
-    fprintf(threadTraceFile_, 
-    "Thread: 0x%lx Line:  %d %s 0x%lx\n" ,
-       (unsigned long)pthread_self(), line, logMessage, 
-       (unsigned long) cursor);
-    fflush(threadTraceFile_);
-  }
-    
-}
-Ex_Lob_Error ExLobGlobals::performRequest(ExLobHdfsRequest *request)
-{
+//Main driver of any LOB related operation 
+
+Ex_Lob_Error ExLobsOper (
+                        char        *lobName,          // lob name
+                        char        *handleIn,         // input handle (for 
cli calls)
+                        Int32       handleInLen,       // input handle len
+                        char        *hdfsServer,       // server where hdfs fs 
resides
+                        Int64       hdfsPort,          // port number to 
access hdfs server
+                        char        *handleOut,        // output handle (for 
cli calls)
+                        Int32       &handleOutLen,     // output handle len
+                        Int64       descNumIn,         // input desc Num (for 
flat files only)
+                        Int64       &descNumOut,       // output desc Num (for 
flat files only)
+                        Int64       &retOperLen,       // length of data 
involved in this operation
+                        Int64       requestTagIn,      // only for checking 
status
+                        Int64       &requestTagOut,    // returned with every 
request other than check status
+                        Ex_Lob_Error  &requestStatus,  // returned req status
+                        Int64       &cliError,         // err returned by cli 
call
+                        char        *dir,              // directory in the 
storage
+                        LobsStorage storage,           // storage type
+                        char        *source,           // source (memory addr, 
filename, foreign lob etc)
+                        Int64       sourceLen,         // source len (memory 
len, foreign desc offset etc)
+                        Int64 cursorBytes,
+                        char *cursorId,
+                        LobsOper    operation,         // LOB operation
+                        LobsSubOper subOperation,      // LOB sub operation
+                        Int64       waited,            // waited or nowaited
+                        void        *&globPtr,         // ptr to the Lob 
objects. 
+                        Int64       transId,
+                        void        *blackBox,         // black box to be sent 
to cli
+                        Int32       blackBoxLen,       // length of black box
+                        Int64       lobMaxSize,
+                        Int64       lobMaxChunkMemSize,
+                         Int64       lobGCLimit,
+                        int         bufferSize ,
+                        short       replication ,
+                        int         blockSize,
+                        Lng32       openType)
+{ 
   Ex_Lob_Error err = LOB_OPER_OK;
-  ExLob *lobPtr;
-  ExLobCursorBuffer *buf;
-  ExLobCursor *cursor;
-  Int64 size;
-  NABoolean seenEOR = false;
-  NABoolean seenEOD = false;
-  ExLobCursor::bufferList_t::iterator c_it;
-  Int64 totalBufSize;
+  ExLob *lobPtr = NULL;
+  char fn[MAX_LOB_FILE_NAME_LEN];
+  struct timespec startTime;
+  struct timespec endTime;
+  Int64 secs, nsecs, totalnsecs;
+  ExLobPreOpen *preOpenObj;
+  ExLobGlobals *lobGlobals = NULL;
+  transId = 0;
+  retOperLen = 0;
+  ExLobDesc desc;
+    
+  lobMap_t *lobMap = NULL;
+  lobMap_it it;
 
-  switch (request->reqType_) 
-  {
-    case Lob_Hdfs_Cursor_Prefetch :
-      lobPtr = request->lobPtr_;
-      cursor = request->cursor_;
-      traceMessage("locking cursor",cursor,__LINE__);
-      cursor->lock_.lock();
-      while (!cursor->eod_ && !cursor->eor_ && !cursor->eol_) 
-      {
-        postfetchBufListLock_.lock();
-        c_it = postfetchBufList_.begin();
-        if (c_it != postfetchBufList_.end()) {
-          buf = *c_it;
-          postfetchBufList_.erase(c_it);
-          postfetchBufListLock_.unlock();
-         traceMessage("unlocking cursor",cursor,__LINE__);
-          cursor->lock_.unlock();
-        } else { 
-          postfetchBufListLock_.unlock();
-          // there are no empty buffers. 
-          // if prefetch list already has the max, wait for one to free up.
-          totalBufSize =  cursor->prefetchBufList_.size() * 
cursor->bufMaxSize_;
-          if (totalBufSize > LOB_CURSOR_PREFETCH_BYTES_MAX) {
-           traceMessage("wait on condition cursor",cursor,__LINE__);
-            cursor->lock_.wait();
-            char buffer2[2048];
-            sprintf(buffer2, "cursor->eod_ %d cursor->eor_ %d "
-                             "cursor->eol_ %d", cursor->eod_,
-                              cursor->eor_, cursor->eol_);
-            traceMessage(buffer2, cursor, __LINE__);
-            continue;
-          }
-          // create a new buffer
-         traceMessage("unlocking cursor",cursor,__LINE__);
-          cursor->lock_.unlock();
-          buf = new (getHeap()) ExLobCursorBuffer();
-          buf->data_ = (char *) (getHeap())->allocateMemory( 
cursor->bufMaxSize_);
-          lobPtr->stats_.buffersUsed++;
-        }
-        size = min(cursor->bufMaxSize_, (cursor->maxBytes_ - 
cursor->bytesRead_ + (16 * 1024)));
-        if (buf->data_) {
-          lobPtr->readCursorDataSimple(buf->data_, size, *cursor, 
buf->bytesRemaining_);
-          buf->bytesUsed_ = 0;
-         traceMessage("locking cursor",cursor,__LINE__);
-          cursor->lock_.lock();
-          if (size < (cursor->bufMaxSize_)) {
-            cursor->eor_ = true;
-           seenEOR = true;
-          }
-          if (buf->bytesRemaining_) {
-            cursor->prefetchBufList_.push_back(buf);
-           traceMessage("signal condition cursor",cursor,__LINE__);
-            cursor->lock_.wakeOne();
-           traceMessage("unlocking cursor",cursor,__LINE__);
-            cursor->lock_.unlock();
-          } else {
-            cursor->eod_ = true;
-            seenEOD = true;
-           traceMessage("signal condition cursor",cursor,__LINE__);
-            cursor->lock_.wakeOne();
-           traceMessage("unlocking cursor",cursor,__LINE__);
-            cursor->lock_.unlock();
-            postfetchBufListLock_.lock();
-            postfetchBufList_.push_back(buf);
-            postfetchBufListLock_.unlock();
-          }
-        } else {
-          assert("data_ is null"); 
-        }
-       // Important! Break and do not access cursor object if we have reached
-       // end of data or range.
-       // The main thread could have destroyed the cursor 
-       // in ::closeDataCursorSimple
-       if (seenEOD || seenEOR)
-        {
-          char buffer2[2048];
-          sprintf(buffer2, "seenEOD %d seenEOR %d",
-                               seenEOD, seenEOR);
-          traceMessage(buffer2, cursor, __LINE__);
-          break;
-        }
-       traceMessage("locking cursor",cursor,__LINE__);
-       cursor->lock_.lock();
-      } // while
+  clock_gettime(CLOCK_MONOTONIC, &startTime);
 
-      if (!seenEOD && !seenEOR)
+  char *fileName = lobName;
+
+  if (globPtr == NULL)
+    {
+      if (operation == Lob_Init)
        {
-          traceMessage("locking cursor",cursor,__LINE__);
-         cursor->lock_.unlock();
-         if (cursor->eol_) { // never reaches here ??  
-           lobPtr->deleteCursor(cursor->name_, this);
-         }
+         globPtr = (void *) new ExLobGlobals();
+         if (globPtr == NULL) 
+           return LOB_INIT_ERROR;
+
+         lobGlobals = (ExLobGlobals *)globPtr;
+
+         err = lobGlobals->initialize(); 
+         return err;
        }
-      processPreOpens();
-      break;
+      else
+       {
+         return LOB_GLOB_PTR_ERROR;
+       }
+    }
+  else
+    {
+      lobGlobals = (ExLobGlobals *)globPtr;
 
-    default:
-      request->error_ = LOB_HDFS_REQUEST_UNKNOWN;
-  }
+      lobMap = lobGlobals->getLobMap();
 
-  return LOB_OPER_OK;
-}
+      it = lobMap->find(string(fileName));
 
-Ex_Lob_Error ExLobDesc::print()
-{
-    printf("%4d %4d  %4d %4d %4d %4d %8d\n",
-           dataSize_, dataState_, tail_, prev_, next_, nextFree_, dataOffset_);
-    return LOB_OPER_OK;
-}
+      if (it == lobMap->end())
+       {
+         //lobPtr = new (lobGlobals->getHeap())ExLob();
+         lobPtr = new ExLob();
+         if (lobPtr == NULL) 
+           return LOB_ALLOC_ERROR;
 
-///////////////////////////////////////////////////////////////////////////////
-// ExLobGlobals definitions
-///////////////////////////////////////////////////////////////////////////////
+         err = lobPtr->initialize(fileName, (operation == Lob_Create) ? 
EX_LOB_CREATE : EX_LOB_RW, dir, storage, hdfsServer, hdfsPort, dir,bufferSize, 
replication, blockSize,lobMaxSize,lobGlobals);
+         if (err != LOB_OPER_OK)
+           return err;
 
-ExLobGlobals::ExLobGlobals() :
-    lobMap_(NULL), 
-    fs_(NULL),
-    isCliInitialized_(FALSE),
-    isHive_(FALSE),
-    threadTraceFile_(NULL),
-    heap_(NULL)
-{
-  //initialize the log file
-  if (getenv("TRACE_HDFS_THREAD_ACTIONS"))
+         lobMap->insert(pair<string, ExLob*>(string(fileName), lobPtr));
+       }
+      else
+       {
+         lobPtr = it->second;
+        
+       }
+    }
+  /* 
+// **Note** This is code that needs to get called before sneding a request to 
the 
+//mxlobsrvr process. It's inactive code currently   
+  MS_Mon_Transid_Type transIdBig;
+  MS_Mon_Transseq_Type transStartId;
+  if (!lobGlobals->isHive())
     {
-      char logFileName[50]= "";
-      sprintf(logFileName,"trace_threads.%d",getpid());
-      threadTraceFile_ = fopen(logFileName,"a");
+      // get current transaction
+   
+      int transIdErr = ms_transid_get(false, false, &transIdBig, 
&transStartId);
+      // set the pass thru request object values in the lob
+    
+      lobPtr->getRequest()->setValues(lobPtr->getDescFileName(),
+                                     descNumIn, handleInLen, handleIn, storage,
+                                     transId, transIdBig, transStartId,
+                                     (char *)blackBox, blackBoxLen);
     }
-}
+  */
+  switch(operation)
+    {
+    case Lob_Create:
+      break;
 
-ExLobGlobals::~ExLobGlobals()
-{
-    ExLobCursor::bufferList_t::iterator c_it;
-    ExLobCursorBuffer *buf = NULL;
+    case Lob_InsertDesc:
+      err = lobPtr->writeDesc(sourceLen, source, subOperation, descNumOut, 
retOperLen, lobMaxSize, 
lobMaxChunkMemSize,lobGCLimit,handleIn,handleInLen,(char *)blackBox, 
&blackBoxLen,handleOut,handleOutLen,lobGlobals);
+      break;
 
-    preOpenListLock_.lock();
-    preOpenList_.clear();
-    preOpenListLock_.unlock();
+    case Lob_InsertData:
+      err = lobPtr->insertData(source, sourceLen, subOperation, descNumIn, 
retOperLen, lobMaxSize,lobMaxChunkMemSize,handleIn,handleInLen,(char 
*)blackBox, blackBoxLen,handleOut,handleOutLen,lobGlobals);
+      break;
 
-    
-    if (lobMap_) 
-      delete lobMap_;
+    case Lob_InsertDataSimple:
+      err = lobPtr->writeDataSimple(source, sourceLen, subOperation, 
retOperLen,
+                                   bufferSize , replication , blockSize);
+      break;
 
-    for (int i=0; i<NUM_WORKER_THREADS; i++) {
-      enqueueShutdownRequest();
-    }
+    case Lob_Read:
+      if (subOperation == Lob_Memory)
+       err = 
lobPtr->readToMem(source,sourceLen,retOperLen,handleIn,handleInLen,(char 
*)blackBox, blackBoxLen,handleOut,handleOutLen,transId);
+      else if (subOperation == Lob_File)
+       err = lobPtr->readToFile(source, sourceLen, retOperLen, 
lobMaxChunkMemSize,  openType,handleIn,handleInLen,(char *)blackBox, 
blackBoxLen,handleOut,handleOutLen,transId);
+      else  
+       err = LOB_SUBOPER_ERROR;
+      break;
 
-    for (int i=0; i<NUM_WORKER_THREADS; i++) {
-      pthread_join(threadId_[i], NULL);
-    }
-    // Free the post fetch bugf list AFTER the worker threads have left to 
-    // avoid slow worker thread being stuck and master deallocating these 
-    // buffers and not consuming the buffers which could cause a  lock.
- 
-    postfetchBufListLock_.lock();
-    c_it = postfetchBufList_.begin();
-    while (c_it != postfetchBufList_.end()) {
-      buf = *c_it;
-      if (buf->data_) {
-        heap_->deallocateMemory( buf->data_);
-      }
-      c_it = postfetchBufList_.erase(c_it);
-    }
-    postfetchBufListLock_.unlock();
-    
-    //msg_mon_close_process(&serverPhandle);
-    if (threadTraceFile_)
-      fclose(threadTraceFile_);
-    threadTraceFile_ = NULL;
-}
+    case Lob_ReadDesc: // read desc only. Needed for pass thru.
+      err = lobPtr->getDesc(desc,handleIn,handleInLen,(char *)blackBox, 
&blackBoxLen,handleOut,handleOutLen,transId); 
+      retOperLen = 0;
+      break;
+    case Lob_OpenCursor:
+      err = lobPtr->openCursor(handleIn, handleInLen,transId);
+      break;
 
-Ex_Lob_Error ExLobGlobals::setServerPhandle()
-{
-    int nid;
-    
-    int err = msg_mon_get_my_info(&nid, NULL, NULL, NULL, NULL, NULL, NULL, 
NULL);
-    char server[12];
-    sprintf(server, "%s%d", "$ZLOBSRV", nid);
+    case Lob_OpenDataCursorSimple:  
+      if (openType == 1) { // preopen
+       sprintf(fn,"%s:%Lx:%s",lobPtr->getDataFileName(), (long long unsigned 
int)lobName, cursorId);
+       preOpenObj = new (lobGlobals->getHeap()) ExLobPreOpen(lobPtr, fn, 
descNumIn, sourceLen, cursorBytes, waited);
+       lobGlobals->addToPreOpenList(preOpenObj);
+      } else if (openType == 2) { // must open
+       sprintf(fn,"%s:%Lx:%s",lobPtr->getDataFileName(), (long long unsigned 
int)lobName, cursorId);
+       fileName = fn;
+       err = lobPtr->openDataCursor(fileName, Lob_Cursor_Simple, descNumIn, 
sourceLen, cursorBytes, waited, lobGlobals);
+      } else
+       err = LOB_SUBOPER_ERROR;
+      break;
 
-    int oid;
-    err = msg_mon_open_process(server, &serverPhandle, &oid);
+    case Lob_ReadCursor:
+      if ((subOperation == Lob_Memory) || (subOperation == Lob_Buffer))
+       err = lobPtr->readCursor(source, sourceLen, handleIn, handleInLen, 
retOperLen,transId);
+      else if (subOperation == Lob_File)
+       err = lobPtr->readCursor(source, -1, handleIn, handleInLen, 
retOperLen,transId);
+      else  
+       err = LOB_SUBOPER_ERROR;
+      break;
 
-    if (err != XZFIL_ERR_OK)
-      return LOB_SERVER_OPEN_ERROR;
+    case Lob_ReadDataCursorSimple:
+      sprintf(fn,"%s:%Lx:%s",lobPtr->getDataFileName(), (long long unsigned 
int)lobName, cursorId);
+      fileName = fn;       
+      err = lobPtr->readDataCursorSimple(fileName, source, sourceLen, 
retOperLen, lobGlobals);
+      break;
 
-    return LOB_OPER_OK;
-}
+    case Lob_CloseFile:
+      if (lobPtr->hasNoOpenCursors()) {
+       lobGlobals->traceMessage("Lob_CloseFile",NULL,__LINE__);
+       err = lobPtr->closeFile();
+       it = lobMap->find(string(lobName));
+       lobMap->erase(it);
+       delete lobPtr;
+       lobPtr = NULL;
+      }  
+      break;
 
-Ex_Lob_Error ExLobGlobals::resetServerPhandle()
-{
-   Ex_Lob_Error err;
+    case Lob_CloseCursor:
+      err = lobPtr->closeCursor(handleIn, handleInLen);
+      break;
 
-   msg_mon_close_process(&serverPhandle);
+    case Lob_CloseDataCursorSimple:
+      sprintf(fn,"%s:%Lx:%s",lobPtr->getDataFileName(), (long long unsigned 
int)lobName, cursorId);
+      fileName = fn;
+      err = lobPtr->closeDataCursorSimple(fileName, lobGlobals);
+      break;
 
-   err = setServerPhandle();
+    case Lob_Append:
+      if ((subOperation == Lob_Memory) ||(subOperation == Lob_Buffer))
+       err = lobPtr->append(source, sourceLen, subOperation, descNumIn, 
retOperLen,lobMaxSize, 
lobMaxChunkMemSize,lobGCLimit,handleIn,handleInLen,handleOut,handleOutLen,lobGlobals);
+      else if (subOperation == Lob_File)
+       err = lobPtr->append(source, -1, subOperation, descNumIn, 
retOperLen,lobMaxSize, 
lobMaxChunkMemSize,lobGCLimit,handleIn,handleInLen,handleOut,handleOutLen,lobGlobals);
+      else  
+       err = LOB_SUBOPER_ERROR;
+      break;
 
-   return err;
-}
+    case Lob_Update:
+      if ((subOperation == Lob_Memory)||(subOperation == Lob_Buffer))
+       err = lobPtr->update(source, sourceLen, subOperation, descNumIn, 
retOperLen, lobMaxSize, 
lobMaxChunkMemSize,lobGCLimit,handleIn,handleInLen,handleOut,handleOutLen,lobGlobals);
+      else if (subOperation == Lob_File)
+       err = lobPtr->update(source, -1, subOperation,descNumIn, 
retOperLen,lobMaxSize, 
lobMaxChunkMemSize,lobGCLimit,handleIn,handleInLen,handleOut,handleOutLen,lobGlobals);
 
+      else
+       err = LOB_SUBOPER_ERROR;
+      break;
 
-// called once per process
-Ex_Lob_Error ExLobGlobals::initialize()
-{
-    Ex_Lob_Error err = LOB_OPER_OK;
+    case Lob_Delete:
+      err = lobPtr->delDesc(handleIn, handleInLen,transId);
+      break;
 
-    lobMap_ = (lobMap_t *) new (getHeap())lobMap_t; // Leaving this allocated 
from system heap. Since this class contains hdfsFS unable to derive from LOB 
heap
-    if (lobMap_ == NULL)
-      return LOB_INIT_ERROR;
+    case Lob_Drop:
+      err = lobPtr->purgeLob();
+      it = lobMap->find(string(lobName));
+      lobMap->erase(it);
+      delete lobPtr;
+      lobPtr = NULL;
+      break;
 
-    
-    err = setServerPhandle();
-    
+    case Lob_Purge:
+      err = lobPtr->purgeLob();
+      it = lobMap->find(string(lobName));
+      lobMap->erase(it);
+      delete lobPtr;
+      lobPtr = NULL;
+      break;
 
-    // start the worker threads
-    startWorkerThreads();
 
-    return err;
-}
+    case Lob_Stats:
+      err = lobPtr->readStats(source);
+      lobPtr->initStats(); // because file may remain open across cursors
+      break;
 
-static void *workerThreadMain(void *arg)
-{
-   // parameter passed to the thread is an instance of the ExLobHdfs object
-   ExLobGlobals *glob = (ExLobGlobals *)arg;
+    case Lob_Empty_Directory:
+      lobPtr->initialize(fileName, EX_LOB_RW,
+                        dir, storage, hdfsServer, hdfsPort, dir,bufferSize, 
replication, blockSize);
+      err = lobPtr->emptyDirectory();
+      break;
 
-   glob->doWorkInThread();
+    case Lob_Cleanup:
+      delete lobGlobals;
+      break;
+    case Lob_PerformGC:
+      err = lobPtr->compactLobDataFile((ExLobInMemoryDescChunksEntry 
*)source,sourceLen);
+      break;
+    case Lob_RestoreLobDataFile:
+      err = lobPtr->restoreLobDataFile();
+      break;
+    case Lob_PurgeBackupLobDataFile:
+      err = lobPtr->purgeBackupLobDataFile();
+      break;
+    default:
+      err = LOB_OPER_ERROR;
+      break;
+    }
+  /*
+//**Note ** This code is needed to reinstate the master transaction after 
+// returning from the mxlobsrvr process. This is inactive code for now
+if (!lobGlobals->isHive() )
+    {
+      if (lobPtr)
+       // set the pass thru request object values from the lob
+       lobPtr->getRequest()->getValues(descNumOut, handleOutLen, handleOut, 
+                                       requestStatus, cliError,
+                                       (char *)blackBox, blackBoxLen);    // 
reinstate the transaction
+      if (TRANSID_IS_VALID(transIdBig)) {
+       ms_transid_reinstate(transIdBig, transStartId);
+      }
+    }
 
-   return NULL;
+  */
+  clock_gettime(CLOCK_MONOTONIC, &endTime);
+
+  secs = endTime.tv_sec - startTime.tv_sec;
+  nsecs = endTime.tv_nsec - startTime.tv_nsec;
+  if (nsecs < 0) {
+    secs--;
+    nsecs += NUM_NSECS_IN_SEC;
+  }
+  totalnsecs = (secs * NUM_NSECS_IN_SEC) + nsecs;
+  if (lobPtr && lobPtr->getStats())
+    lobPtr->getStats()->hdfsAccessLayerTime += totalnsecs; 
+       
+  return err;
 }
 
-Ex_Lob_Error ExLobGlobals::startWorkerThreads()
-{
-   int rc;
-   for (int i=0; i<NUM_WORKER_THREADS; i++) {
-     rc = pthread_create(&threadId_[i], NULL, workerThreadMain, this);
-     if (rc != 0)
-      return LOB_HDFS_THREAD_CREATE_ERROR;
-   }
-   
-   return LOB_OPER_OK;
+void cleanupLOBDataDescFiles(const char *lobHdfsServer,int lobHdfsPort,const 
char *lobHdfsLoc)
+{ 
+  int numExistingFiles=0;
+  hdfsFS fs;
+  fs = hdfsConnect(lobHdfsServer, lobHdfsPort);
+  if (fs == NULL)
+    return;
+  // Get this list of all data and desc files in the lob sotrage location
+  hdfsFileInfo *fileInfos = hdfsListDirectory(fs, lobHdfsLoc, 
&numExistingFiles);
+  if (fileInfos == NULL)
+    return ;
+  //Delete each one in a loop
+  for (int i = 0; i < numExistingFiles; i++)      
+    hdfsDelete(fs, fileInfos[i].mName, 0);
+    
+  // *Note* : delete the memory allocated by libhdfs for the file info array  
+  if (fileInfos)
+    {
+      hdfsFreeFileInfo(fileInfos, numExistingFiles);
+    }
 }
 
-///////////////////////////////////////////////////////////////////////////////
-// ExLobRequest definitions
-///////////////////////////////////////////////////////////////////////////////
 
-ExLobRequest::ExLobRequest() :
-    reqNum_(0),
-    descNumIn_(-1),
-    descNumOut_(-1),
-    handleInLen_(-1),
-    handleOutLen_(-1),
-    dataOffset_(-1),
-    type_(Lob_Req_Invalid),
-    storage_(Lob_Invalid_Storage),
-    operLen_(-1),
-    error_(LOB_INVALID_ERROR_VAL),
-    cliError_(-1),
-    status_(LOB_INVALID_ERROR_VAL),
-    transId_(0)
-{
-   TRANSID_SET_NULL(transIdBig_);
-}
+// The following methods are used for hive access
+/* 
+Main thread issues an open to open a range of 128 MB and wakes up a 
+worker thread. It doesn’t wait.It calls pre open on the next range. This is 
+done in method ::readDataCursorSimple.
 
-void ExLobRequest::setValues(char *descFileName, Int64 descNumIn, Int64 
handleInLen, 
-                             char *handleIn, LobsStorage storage, Int64 
transId,
-                             SB_Transid_Type transIdBig,
-                             SB_Transseq_Type transStartId,
-                             char *blackBox, Int64 blackBoxLen)
-{
-  
-    descNumIn_ = descNumIn;
-    handleInLen_ = handleInLen;
-    storage_ = storage;
-    strcpy(descFileName_, descFileName);
-    if (handleIn != NULL && handleInLen > 0) {
-       memcpy(handleIn_, handleIn, handleInLen);
-    }
-    cliError_ = -1;
-    error_ = LOB_INVALID_ERROR_VAL;
-    status_ = LOB_INVALID_ERROR_VAL;
+The worker threads do their work in ::doWorkInThread and ::performRequests, 
::readCursorDataSimple.(note the  diff from the method the mainthread calls 
above) 
 
-    transId_ = transId;
-    transIdBig_ = transIdBig;
-    transStartId_ = transStartId;
-    blackBoxLen_ = blackBoxLen;
-    if (blackBox != NULL && blackBoxLen > 0) {
-       memcpy(blackBox_, blackBox, blackBoxLen);
-    }
-    
-}
+Main thread then issues a read. Since worker thread had already begun fetching 
+16KB buffers in (1), the main thread most likely will not need to wait and the 
+data will be ready. It keeps consuming the buffers, recycling them back into 
+postFetchBufList. 
+When done, the main thread closes the cursor(::closeDataCursorSimple). This is 
determined by whether we 
+have reached the end of range or the end of data for that file.
+The worker threads on the other hand  read 16KB of data and buffers them in a 
+prefetchBufList. It continues doing this until end of range is reached or the 
+buffer limit (128MB) has been reached. 
+*/
 
-void ExLobRequest::getValues(Int64 &descNumOut, Int64 &handleOutLen, 
-                             char *handleOut, Ex_Lob_Error &requestStatus, 
-                             Int64 &cliError,
-                             char *blackBox, Int64 &blackBoxLen)
+Ex_Lob_Error ExLob::readDataCursorSimple(char *file, char *tgt, Int64 tgtSize, 
+                                         Int64 &operLen, ExLobGlobals 
*lobGlobals)
 {
-  
-    descNumOut = descNumOut_;
-    handleOutLen = handleOutLen_;
-    requestStatus = error_;
-    cliError = cliError_;
-    if (handleOut != NULL && handleOutLen_ > 0) {
-       memcpy(handleOut, handleOut_, handleOutLen_);
-    }
-    blackBoxLen = blackBoxLen_;
-    if (blackBox != NULL && blackBoxLen_ > 0) {
-       memcpy(blackBox, blackBox_, blackBoxLen_);
-    }
-    // #endif
-}
+    int dataOffset;
+    Ex_Lob_Error result = LOB_OPER_OK;
+    cursor_t *cursor;
+    ExLobCursor::bufferList_t::iterator c_it;
+    ExLobCursorBuffer *buf = NULL;
+    Int64 bytesToCopy = 0;
+    operLen = 0;
+    Int64 len;
+    char *target = tgt;
+    bool done = false;
 
-ExLobRequest::~ExLobRequest()
-{
-}
+    struct timespec startTime;
+    struct timespec endTime;
 
-Ex_Lob_Error ExLobRequest::send()
-{
- 
+    lobCursorLock_.lock();
 
-    int msgid; 
-    int oid;
-    MS_Result_Type result;
-    short req_ctrl[BUFSIZ];
-    short rep_ctrl[BUFSIZ];
-    char *req_data = (char *)this;
-    ExLobRequest rep_data;
-    short req_data_len = sizeof(ExLobRequest);
-    short rep_data_max = sizeof(ExLobRequest);
-    int err=0;
-    int inx=0;
-    int retries = 3;
+    lobCursors_it it = lobCursors_.find(string(file, strlen(file)));
 
-    incrReqNum();
+    if (it == lobCursors_.end())
+    {
+       lobCursorLock_.unlock();
+       return LOB_CURSOR_NOT_OPEN;
+    }
+    else
+    {
+       cursor = &(it->second); 
+    } 
 
-    status_ = LOB_OPER_REQ_IN_PROGRESS;
+    lobCursorLock_.unlock();
 
-    do 
+    while ((operLen < tgtSize) && !done && !cursor->eol_)
     {
-       err = BMSG_LINK_(&serverPhandle,
-                        &msgid, 
-                        req_ctrl,
-                        (ushort) (inx &1),
-                        rep_ctrl,
-                        1,
-                        req_data,
-                        req_data_len,
-                        (char *)&rep_data, 
-                        rep_data_max,
-                        0,0,0,0); 
-       retries--;
-
-       err = BMSG_BREAK_(msgid, (short *) &result, &serverPhandle);
+      lobGlobals->traceMessage("locking cursor",cursor,__LINE__);
+      cursor->lock_.lock();
 
-       if (err == XZFIL_ERR_PATHDOWN) {
-        //lobGlobals->resetServerPhandle();
-       }
+      // if no buffers to read and is eor or eod, we are done.
+      // else wait for prefetch thread to wake us up. 
+      if (cursor->prefetchBufList_.size() == 0) {
+        if (cursor->eor_ || cursor->eod_) {
+          done = true;
+        } else {
+          cursor->bufferMisses_++;
+         lobGlobals->traceMessage("wait on condition cursor",cursor,__LINE__);
+          cursor->lock_.wait();
+        }
+       lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__);
+        cursor->lock_.unlock();
+        continue;
+      } 
 
-    } while ( (err == XZFIL_ERR_PATHDOWN) && (retries > 0) ); // 201 if 
lobserver got restared
+      // a buffer is available
+      c_it = cursor->prefetchBufList_.begin();
+      buf = *c_it;
+      lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__);
+      cursor->lock_.unlock();
 
-    status_ = LOB_OPER_REQ_DONE;
+      bytesToCopy = min(buf->bytesRemaining_, tgtSize - operLen);
+      memcpy(target, buf->data_ + buf->bytesUsed_, bytesToCopy);
+      target += bytesToCopy;
+      if (bytesToCopy == buf->bytesRemaining_) { // buffer is now empty
+        buf->bytesRemaining_ = -1;
+        buf->bytesUsed_ = -1;
+        lobGlobals->postfetchBufListLock_.lock();
+        lobGlobals->postfetchBufList_.push_back(buf);
+        lobGlobals->postfetchBufListLock_.unlock();
+       lobGlobals->traceMessage("locking cursor",cursor,__LINE__);
+        cursor->lock_.lock();
+        c_it = cursor->prefetchBufList_.erase(c_it);
+       lobGlobals->traceMessage("signal condition cursor",cursor,__LINE__);
+        cursor->lock_.wakeOne(); // wake up prefetch thread if it was waiting 
for an empty buffer.
+       lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__);
+        cursor->lock_.unlock();
+      } else {
+        buf->bytesUsed_ += bytesToCopy;
+        buf->bytesRemaining_ -= bytesToCopy;
+      }
+      stats_.bytesPrefetched += bytesToCopy;
+      operLen += bytesToCopy;
+    } 
 
-    if (err != XZFIL_ERR_OK)
-      return LOB_SEND_MSG_ERROR;
+    // update stats
+    stats_.bytesRead += operLen;
+    stats_.bytesToRead += tgtSize;
+    stats_.numReadReqs++;
 
-    memcpy(this, &rep_data, rep_data_max);
-  
     return LOB_OPER_OK;
 }
 
-void ExLobRequest::getDescOut(ExLobDesc &desc) 
-{ 
-   memcpy(&desc, &desc_, sizeof(ExLobDesc)); 
-}
-
-void ExLobRequest::putDescIn(ExLobDesc &desc) 
-{ 
-   memcpy(&desc_, &desc, sizeof(ExLobDesc)); 
-}
-
-///////////////////////////////////////////////////////////////////////////////
-// ExLobHdfs definitions
-///////////////////////////////////////////////////////////////////////////////
-#ifdef SQ_USE_HDFS
-
-ExLobLock::ExLobLock()
-    : bellRang_(false),
-      waiters_(0)
+Ex_Lob_Error ExLob::closeDataCursorSimple(char *fileName, ExLobGlobals 
*lobGlobals)
 {
-   pthread_mutexattr_t mutexAttr;
-   pthread_mutexattr_init( &mutexAttr );
-   pthread_mutex_init( &mutex_, &mutexAttr );
-   pthread_cond_init( &workBell_, NULL );
-}
+    cursor_t *cursor = NULL;
+    Int64 secs = 0;
+    Int64 nsecs = 0;
 
-ExLobLock::~ExLobLock()
-{
-   pthread_mutex_unlock( &mutex_ );
-   pthread_mutex_destroy(&mutex_);
-   pthread_cond_destroy(&workBell_);
-}
+    lobCursorLock_.lock();
 
-void ExLobLock::lock()
-{
-   pthread_mutex_lock( &mutex_ );
-}
+    lobCursors_it it = lobCursors_.find(string(fileName, strlen(fileName)));
+    if (it != lobCursors_.end())
+    {
+      cursor = &(it->second);
+      lobGlobals->traceMessage("locking cursor",cursor,__LINE__);
+      cursor->lock_.lock();
 
-void ExLobLock::unlock()
-{
-   pthread_mutex_unlock( &mutex_ );
-}
+      clock_gettime(CLOCK_MONOTONIC, &cursor->closeTime_);
+      secs = cursor->closeTime_.tv_sec - cursor->openTime_.tv_sec;
+      nsecs = cursor->closeTime_.tv_nsec - cursor->openTime_.tv_nsec;
 
-void ExLobLock::wakeOne()
-{
-   pthread_cond_signal(&workBell_);
-}
+      if (cursor->eod_ || cursor->eor_) { // prefetch thread already done,
+        cursor->emptyPrefetchList(lobGlobals);
+       lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__);
+       cursor->lock_.unlock();
+        lobCursors_.erase(it);            // so erase it here. 
+        // no need to unlock as cursor object is gone.
+      } else {
+        cursor->eol_ = true;     // prefetch thread will do the eol rituals
+       lobGlobals->traceMessage("signal condition cursor",cursor,__LINE__);
+        cursor->lock_.wakeOne(); // wakeup prefetch thread
+       lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__);
+        cursor->lock_.unlock();
+      }
+    }
 
-void ExLobLock::wakeAll()
-{
-   pthread_cond_broadcast(&workBell_);
-}
+    lobCursorLock_.unlock();
 
-void ExLobLock::wait()
-{
-    waiters_++;
-    pthread_cond_wait(&workBell_, &mutex_);
-    waiters_--;
-}
+    if (nsecs < 0) {
+      secs--;
+      nsecs += NUM_NSECS_IN_SEC;
+    }
+    Int64 totalnsecs = (secs * NUM_NSECS_IN_SEC) + nsecs;
+    stats_.cursorElapsedTime += totalnsecs;
 
-ExLobHdfsRequest::ExLobHdfsRequest(LobsHdfsRequestType reqType, hdfsFS fs, 
-                                   hdfsFile file, char *buffer, int size) :
-   reqType_(reqType),
-   fs_(fs),
-   file_(file),
-   buffer_(buffer),
-   size_(size)
-{
-  lobPtr_ = 0;
-  error_ = LOB_OPER_OK;
+    return LOB_OPER_OK;
 }
 
-ExLobHdfsRequest::ExLobHdfsRequest(LobsHdfsRequestType reqType, ExLobCursor 
*cursor) :
-   reqType_(reqType),
-   cursor_(cursor)
-{
-  buffer_=0;
-  lobPtr_=0;
-  fs_=0;
-  file_=0;
-  size_=0;
-  error_=LOB_OPER_OK;
-}
 
-ExLobHdfsRequest::ExLobHdfsRequest(LobsHdfsRequestType reqType, ExLob *lobPtr, 
ExLobCursor *cursor) :
-   reqType_(reqType),
-   lobPtr_(lobPtr),
-   cursor_(cursor)
+Ex_Lob_Error ExLobGlobals::performRequest(ExLobHdfsRequest *request)
 {
-  buffer_=0;
-  fs_=0;
-  file_=0;
-  size_=0;
-  error_=LOB_OPER_OK;
-}
+  Ex_Lob_Error err = LOB_OPER_OK;
+  ExLob *lobPtr;
+  ExLobCursorBuffer *buf;
+  ExLobCursor *cursor;
+  Int64 size;
+  NABoolean seenEOR = false;
+  NABoolean seenEOD = false;
+  ExLobCursor::bufferList_t::iterator c_it;
+  Int64 totalBufSize;
 
-ExLobHdfsRequest::ExLobHdfsRequest(LobsHdfsRequestType reqType) :
-   reqType_(reqType)
-{
+  switch (request->reqType_) 
+  {
+    case Lob_Hdfs_Cursor_Prefetch :
+      lobPtr = request->lobPtr_;
+      cursor = request->cursor_;
+      traceMessage("locking cursor",cursor,__LINE__);
+      cursor->lock_.lock();
+      while (!cursor->eod_ && !cursor->eor_ && !cursor->eol_) 
+      {
+        postfetchBufListLock_.lock();
+        c_it = postfetchBufList_.begin();
+        if (c_it != postfetchBufList_.end()) {
+          buf = *c_it;
+          postfetchBufList_.erase(c_it);
+          postfetchBufListLock_.unlock();
+         traceMessage("unlocking cursor",cursor,__LINE__);
+          cursor->lock_.unlock();
+        } else { 
+          postfetchBufListLock_.unlock();
+          // there are no empty buffers. 
+          // if prefetch list already has the max, wait for one to free up.
+          totalBufSize =  cursor->prefetchBufList_.size() * 
cursor->bufMaxSize_;
+          if (totalBufSize > LOB_CURSOR_PREFETCH_BYTES_MAX) {
+           traceMessage("wait on condition cursor",cursor,__LINE__);
+            cursor->lock_.wait();
+            char buffer2[2048];
+            sprintf(buffer2, "cursor->eod_ %d cursor->eor_ %d "
+                             "cursor->eol_ %d", cursor->eod_,
+                              cursor->eor_, cursor->eol_);
+            traceMessage(buffer2, cursor, __LINE__);
+            continue;
+          }
+          // create a new buffer
+         traceMessage("unlocking cursor",cursor,__LINE__);
+          cursor->lock_.unlock();
+          buf = new (getHeap()) ExLobCursorBuffer();
+          buf->data_ = (char *) (getHeap())->allocateMemory( 
cursor->bufMaxSize_);
+          lobPtr->stats_.buffersUsed++;
+        }
+        size = min(cursor->bufMaxSize_, (cursor->maxBytes_ - 
cursor->bytesRead_ + (16 * 1024)));
+        if (buf->data_) {
+          lobPtr->readCursorDataSimple(buf->data_, size, *cursor, 
buf->bytesRemaining_);
+          buf->bytesUsed_ = 0;
+         traceMessage("locking cursor",cursor,__LINE__);
+          cursor->lock_.lock();
+          if (size < (cursor->bufMaxSize_)) {
+            cursor->eor_ = true;
+           seenEOR = true;
+          }
+          if (buf->bytesRemaining_) {
+            cursor->prefetchBufList_.push_back(buf);
+           traceMessage("signal condition cursor",cursor,__LINE__);
+            cursor->lock_.wakeOne();
+           traceMessage("unlocking cursor",cursor,__LINE__);
+            cursor->lock_.unlock();
+          } else {
+            cursor->eod_ = true;
+            seenEOD = true;
+           traceMessage("signal condition cursor",cursor,__LINE__);
+            cursor->lock_.wakeOne();
+           traceMessage("unlocking cursor",cursor,__LINE__);
+            cursor->lock_.unlock();
+            postfetchBufListLock_.lock();
+            postfetchBufList_.push_back(buf);
+            postfetchBufListLock_.unlock();
+          }
+        } else {
+          assert("data_ is null"); 
+        }
+       // Important! Break and do not access cursor object if we have reached
+       // end of data or range.
+       // The main thread could have destroyed the cursor 
+       // in ::closeDataCursorSimple
+       if (seenEOD || seenEOR)
+        {
+          char buffer2[2048];
+          sprintf(buffer2, "seenEOD %d seenEOR %d",
+                               seenEOD, seenEOR);
+          traceMessage(buffer2, cursor, __LINE__);
+          break;
+        }
+       traceMessage("locking cursor",cursor,__LINE__);
+       cursor->lock_.lock();
+      } // while
 
-  buffer_=0;
-  cursor_=0;
-  lobPtr_=0;
-  fs_=0;
-  file_=0;
-  size_=0;
-  error_=LOB_OPER_OK;
-}
+      if (!seenEOD && !seenEOR)
+       {
+          traceMessage("locking cursor",cursor,__LINE__);
+         cursor->lock_.unlock();
+         if (cursor->eol_) { // never reaches here ??  
+           lobPtr->deleteCursor(cursor->name_, this);
+         }
+       }
+      processPreOpens();
+      break;
 
-ExLobHdfsRequest::~ExLobHdfsRequest()
-{
+    default:
+      request->error_ = LOB_HDFS_REQUEST_UNKNOWN;
+  }
+
+  return LOB_OPER_OK;
 }
 
-Ex_Lob_Error ExLobGlobals::enqueueRequest(ExLobHdfsRequest *request)
+
+
+Ex_Lob_Error ExLob::readCursorDataSimple(char *tgt, Int64 tgtSize, cursor_t 
&cursor, Int64 &operLen)
 {
-   char buffer2[2048];
-   sprintf(buffer2, "enqueue request %d", request->reqType_);
-   traceMessage(buffer2, NULL, __LINE__);
-   reqQueueLock_.lock();
-   reqQueue_.push_back(request);
-   reqQueueLock_.wakeOne();
-   reqQueueLock_.unlock();
+   ExLobDesc desc;
+   Ex_Lob_Error err;
+   Int64 bytesAvailable = 0;
+   Int64 bytesToCopy = 0;
+   Int64 bytesRead = 0;
+   operLen = 0;
+   tOffset offset; 
+   struct timespec startTime; 
+   struct timespec endTime;
+   bool done = false;
 
-   return LOB_OPER_OK;
-}
+   if (!fdData_) {
+      return LOB_CURSOR_NOT_OPEN_ERROR;
+   }
 
-Ex_Lob_Error ExLobGlobals::enqueuePrefetchRequest(ExLob *lobPtr, ExLobCursor 
*cursor)
-{// Leaving this allocated from system heap. Since this class contains hdfsFS 
unable to derive from LOB heap
-  ExLobHdfsRequest *request = new  ExLobHdfsRequest(Lob_Hdfs_Cursor_Prefetch, 
lobPtr, cursor);
-   
-   if (!request) {
-     // return error
+   if (cursor.bytesRead_ == -1) {  // starting
+      cursor.bytesRead_ = 0;
    }
 
-   enqueueRequest(request);
+   clock_gettime(CLOCK_MONOTONIC, &startTime);
+   
+   whil

<TRUNCATED>

Reply via email to