http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/681cad66/core/sql/exp/ExpLOBaccess.cpp ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOBaccess.cpp b/core/sql/exp/ExpLOBaccess.cpp index f16d48b..217f96a 100644 --- a/core/sql/exp/ExpLOBaccess.cpp +++ b/core/sql/exp/ExpLOBaccess.cpp @@ -23,7 +23,7 @@ /* -*-C++-*- ***************************************************************************** * - * File: ex_lob.C + * File: EXLOBaccess.cpp * Description: class to store and retrieve LOB data. * * @@ -47,25 +47,25 @@ #include <sys/stat.h> #include <sys/time.h> -#define SQ_USE_HDFS 1 -#ifdef SQ_USE_HDFS + + #include "hdfs.h" #include "jni.h" -#endif + #include "ExpLOBstats.h" #include "ExpLOBaccess.h" #include "ExpLOBinterface.h" - +#include "ExpLOBexternal.h" #include "NAVersionedObject.h" #include "ComQueue.h" #include "NAMemory.h" #include <seabed/ms.h> -#include <../../sqf/src/seabed/src/trans.h> #include <seabed/fserr.h> #include <curl/curl.h> +#include <../../sqf/src/seabed/src/trans.h> extern int ms_transid_get(bool pv_supp, bool pv_trace, MS_Mon_Transid_Type *pp_transid, @@ -79,15 +79,12 @@ ExLob::ExLob() : storage_(Lob_Invalid_Storage), dir_(string()), lobGlobalHeap_(NULL), - - // fdDesc_(-1), - fdDesc_(NULL), fs_(NULL), fdData_(NULL), openFlags_(0) { lobDataFile_[0] = '\0'; - lobDescFile_[0] = '\0'; + } ExLob::~ExLob() @@ -97,10 +94,8 @@ ExLob::~ExLob() hdfsCloseFile(fs_, fdData_); fdData_ = NULL; } - if (fdDesc_) { - hdfsCloseFile(fs_, fdDesc_); - fdDesc_ = NULL; - } + + /* Commenting this out. It is causing cores during hive access. @@ -118,6 +113,7 @@ Ex_Lob_Error ExLob::initialize(char *lobFile, Ex_Lob_Mode mode, char *dir, LobsStorage storage, char *hdfsServer, Int64 hdfsPort, + char *lobLocation, int bufferSize , short replication , int blockSize, Int64 lobMaxSize, ExLobGlobals *lobGlobals) { @@ -126,7 +122,7 @@ Ex_Lob_Error ExLob::initialize(char *lobFile, Ex_Lob_Mode mode, struct timespec startTime; struct timespec endTime; Int64 secs, nsecs, totalnsecs; - + if (dir) { if (dir_.empty()) @@ -136,12 +132,12 @@ Ex_Lob_Error ExLob::initialize(char *lobFile, Ex_Lob_Mode mode, snprintf(lobDataFile_, MAX_LOB_FILE_NAME_LEN, "%s/%s", dir_.c_str(), lobFile); - snprintf(lobDescFile_, MAX_LOB_FILE_NAME_LEN, "%s/%s.desc", dir_.c_str(), lobFile); + } else { snprintf(lobDataFile_, MAX_LOB_FILE_NAME_LEN, "%s", lobFile); - snprintf(lobDescFile_, MAX_LOB_FILE_NAME_LEN, "%s.desc", lobFile); + } if (storage_ != Lob_Invalid_Storage) @@ -156,7 +152,7 @@ Ex_Lob_Error ExLob::initialize(char *lobFile, Ex_Lob_Mode mode, hdfsServer_ = hdfsServer; hdfsPort_ = hdfsPort; - + lobLocation_ = lobLocation; clock_gettime(CLOCK_MONOTONIC, &startTime); if (lobGlobals->getHdfsFs() == NULL) @@ -200,107 +196,94 @@ Ex_Lob_Error ExLob::initialize(char *lobFile, Ex_Lob_Mode mode, } hdfsCloseFile(fs_, fdData_); fdData_ = NULL; - if (!lobGlobals->isHive()) - { - //Create the desc header file that holds info about the - //lob data file offsets etc. - fdDesc_ = hdfsOpenFile(fs_, lobDescFile_, O_WRONLY, bufferSize, replication, blockSize); - if (!fdDesc_) - { - return LOB_DESC_FILE_CREATE_ERROR; - } - //write empty header info into it. - ExLobDescHeader header(lobMaxSize); - - Int64 numWritten = 0; - numWritten = hdfsWrite(fs_, fdDesc_, (void *)&header, sizeof(ExLobDescHeader)); - if (numWritten <=0) - return LOB_DATA_WRITE_ERROR; - - - if (hdfsFlush(fs_, fdDesc_)) - return LOB_DATA_FLUSH_ERROR; - - hdfsCloseFile(fs_, fdDesc_); - fdDesc_ = NULL; - } + } lobGlobalHeap_ = lobGlobals->getHeap(); return LOB_OPER_OK; } -Ex_Lob_Error ExLob::fetchCursor() +Ex_Lob_Error ExLob::fetchCursor(char *handleIn, Int32 handleLenIn, Int64 &outOffset, Int64 &outSize,NABoolean &isEOD, Int64 transId) { - Ex_Lob_Error err; - - request_.setType(Lob_Req_Fetch_Cursor); + Ex_Lob_Error err = LOB_OPER_OK; + Int64 dummyParam; + int cliErr=0; + Int64 offset = 0; + Int64 size = 0; + lobCursors_it it = lobCursors_.find(string(handleIn, handleLenIn)); - err = request_.send(); + if (it == lobCursors_.end()) + { + return LOB_CURSOR_NOT_OPEN; + } - if (err != LOB_OPER_OK) { + void *cliInterface = it->second.cliInterface_; + + + cliErr = SQL_EXEC_LOBcliInterface(handleIn, handleLenIn, + 0, 0, + (char *)&dummyParam, (Lng32 *)&dummyParam, + LOB_CLI_SELECT_FETCH, LOB_CLI_ExecImmed, + &offset, &size, + &dummyParam, &dummyParam, + &cliInterface, + transId); + if (err != LOB_OPER_OK) + { return err; - } + } + if (cliErr == 100 ) + { + isEOD= TRUE; + cliErr = SQL_EXEC_LOBcliInterface(handleIn, handleLenIn, + NULL, NULL, + (char *)&dummyParam, (Lng32 *)&dummyParam, + LOB_CLI_SELECT_CLOSE, LOB_CLI_ExecImmed, + &dummyParam, &dummyParam, + &dummyParam, &dummyParam, + &cliInterface, + transId); + + } + else + { + outOffset = offset; + outSize = size; + } - err = request_.getError(); + return err; } -Ex_Lob_Error ExLob::delDesc(Int64 descNum) -{ - Ex_Lob_Error err; - - request_.setType(Lob_Req_Del_Desc); - request_.setDescNumIn(descNum); - err = request_.send(); - - if (err != LOB_OPER_OK) { - return err; - } - - err = request_.getError(); - - return err; -} -Ex_Lob_Error ExLob::getDesc(ExLobDesc &desc) +Ex_Lob_Error ExLob::getDesc(ExLobDesc &desc,char * handleIn, Int32 handleInLen, char *blackBox, Int32 *blackBoxLen, char *handleOut, Int32 &handleOutLen, Int64 transId) { - Ex_Lob_Error err; - - request_.setType(Lob_Req_Get_Desc); - - err = request_.send(); - - if (err != LOB_OPER_OK) { - return err; - } - - request_.getDescOut(desc); - err = request_.getError(); + Ex_Lob_Error err = LOB_OPER_OK; + NABoolean multipleChunks = FALSE; + Int32 clierr = 0; + Int64 size,offset,dummyParam = 0; + + clierr = SQL_EXEC_LOBcliInterface(handleIn, + handleInLen, + blackBox, blackBoxLen, + handleOut, &handleOutLen, + LOB_CLI_SELECT_UNIQUE, LOB_CLI_ExecImmed, + &offset, &size, + &dummyParam, &dummyParam, + 0, + transId); + + if (clierr < 0) + return LOB_DESC_READ_ERROR; + desc.setOffset(offset); + desc.setSize(size); + return err; } -Ex_Lob_Error ExLob::putDesc(ExLobDesc &desc, Int64 descNum) -{ - Ex_Lob_Error err; - - request_.setType(Lob_Req_Put_Desc); - request_.setDescNumIn(descNum); - request_.putDescIn(desc); - - err = request_.send(); - - if (err != LOB_OPER_OK) { - return err; - } - - err = request_.getError(); - - return err; -} Ex_Lob_Error ExLob::writeData(Int64 offset, char *data, Int32 size, Int64 &operLen) { @@ -642,10 +625,14 @@ Ex_Lob_Error ExLob::readExternalSourceFile(char *srcfile, char *&fileData, Int32 return LOB_OPER_OK; } -Ex_Lob_Error ExLob::writeDesc(Int64 &sourceLen, char *source, LobsSubOper subOper, Int64 &descNumOut, Int64 &operLen, Int64 lobMaxSize) +Ex_Lob_Error ExLob::writeDesc(Int64 &sourceLen, char *source, LobsSubOper subOper, Int64 &descNumOut, Int64 &operLen, Int64 lobMaxSize,Int64 lobMaxChunkMemSize,Int64 lobGCLimit, char * handleIn, Int32 handleInLen, char *blackBox, Int32 *blackBoxLen, char *handleOut, Int32 &handleOutLen, void *lobGlobals) { - Ex_Lob_Error err; + Ex_Lob_Error err=LOB_OPER_OK; Int64 dataOffset = 0; + Int64 outDescPartnKey = 0; + Int64 outDescSyskey = 0; + Int32 clierr = 0; + // Calculate sourceLen for each subOper. if (subOper == Lob_File) { @@ -656,25 +643,59 @@ Ex_Lob_Error ExLob::writeDesc(Int64 &sourceLen, char *source, LobsSubOper subOpe if (sourceLen <= 0 || sourceLen > lobMaxSize) { return LOB_MAX_LIMIT_ERROR; //exceeded the size of the max lob size + //TBD trigger compaction } - err = allocateDesc((unsigned int)sourceLen, descNumOut, dataOffset, lobMaxSize); + err = allocateDesc((unsigned int)sourceLen, descNumOut, dataOffset, lobMaxSize, lobMaxChunkMemSize,handleIn, handleInLen, lobGCLimit,lobGlobals); operLen = 0; if (err != LOB_OPER_OK) return err; - - //send a message to mxlobsrvr to insert into the descriptor tables - request_.setType(Lob_Req_Allocate_Desc); - request_.getDesc().setSize(sourceLen); - request_.setDescNumOut(descNumOut); - request_.setDataOffset(dataOffset); - err = request_.send(); - if (err != LOB_OPER_OK) { - return err; + + clierr = SQL_EXEC_LOBcliInterface(handleIn, + handleInLen, + NULL, blackBoxLen, + handleOut, &handleOutLen, + LOB_CLI_INSERT, LOB_CLI_ExecImmed, + &dataOffset, &sourceLen, + &outDescPartnKey, &outDescSyskey, + 0, + 0); + if (clierr < 0 ) { + return LOB_DESC_WRITE_ERROR; } return err; } + +Ex_Lob_Error ExLob::insertDesc(Int64 offset, Int64 size, char *handleIn, Int32 handleInLen, char *handleOut, Int32 &handleOutLen, char *blackBox, Int32 blackBoxLen,void *lobGlobals) +{ + + Lng32 clierr; + Int64 dummyParam; + Int64 outDescSyskey = 0; + Int64 outDescPartnKey = 0; + handleOutLen = 0; + Int32 chunkNum = 1; + + NABoolean foundUnused = FALSE; + + clierr = SQL_EXEC_LOBcliInterface(handleIn, + handleInLen, + NULL, &chunkNum, + handleOut, &handleOutLen, + LOB_CLI_INSERT, LOB_CLI_ExecImmed, + &offset, &size, + &outDescPartnKey, &outDescSyskey, + 0, + 0); + + if (clierr < 0 ) { + return LOB_DESC_WRITE_ERROR; + } + return LOB_OPER_OK; +} + + Ex_Lob_Error ExLob::writeLobData(char *source, Int64 sourceLen, LobsSubOper subOperation, Int64 tgtOffset,Int64 &operLen, Int64 lobMaxChunkMemSize) { Ex_Lob_Error err; @@ -692,8 +713,7 @@ Ex_Lob_Error ExLob::writeLobData(char *source, Int64 sourceLen, LobsSubOper subO err = readSourceFile(source, inputAddr, allocMemSize, readOffset); if (err != LOB_OPER_OK) return err; - } - + } else { // in memory @@ -723,10 +743,10 @@ Ex_Lob_Error ExLob::writeLobData(char *source, Int64 sourceLen, LobsSubOper subO return err; } -Ex_Lob_Error ExLob::readToMem(char *memAddr, Int64 size, Int64 &operLen) +Ex_Lob_Error ExLob::readToMem(char *memAddr, Int64 size, Int64 &operLen,char * handleIn, Int32 handleInLen, char *blackBox, Int32 blackBoxLen, char * handleOut, Int32 &handleOutLen, Int64 transId) { Ex_Lob_Error err = LOB_OPER_OK; - + NABoolean multipleChunks = FALSE; int cliErr; @@ -735,12 +755,19 @@ Ex_Lob_Error ExLob::readToMem(char *memAddr, Int64 size, Int64 &operLen) Int64 sizeToRead = 0; - err = getDesc(desc); + err = getDesc(desc,handleIn,handleInLen,blackBox, &blackBoxLen,handleOut,handleOutLen,transId); + if (err != LOB_OPER_OK) + { + return err; + } sizeToRead = MINOF(size,desc.getSize()); - if (getRequest()->getBlackBoxLen() == -1) - sizeToRead = size; - err = readDataToMem(memAddr, desc.getOffset(),sizeToRead, operLen); + if (blackBoxLen == -1) + { + sizeToRead = size; + multipleChunks = TRUE; + } + err = readDataToMem(memAddr, desc.getOffset(),sizeToRead, operLen, handleIn,handleInLen, multipleChunks,transId); return err; } @@ -772,18 +799,20 @@ LobInputOutputFileType ExLob::fileType(char *ioFileName) else return LOCAL_FILE; } -Ex_Lob_Error ExLob::readToFile(char *tgtFileName, Int64 tgtLength, Int64 &operLen, Int64 lobMaxChunkMemLen, Int32 fileflags) +Ex_Lob_Error ExLob::readToFile(char *tgtFileName, Int64 tgtLength, Int64 &operLen, Int64 lobMaxChunkMemLen, Int32 fileflags,char *handleIn,Int32 handleInLen, char *blackBox, Int32 blackBoxLen, char * handleOut, Int32 &handleOutLen, Int64 transId) { Ex_Lob_Error err = LOB_OPER_OK; Int64 srcOffset = 0; Int64 srcLength = 0; LobInputOutputFileType tgtType = fileType(tgtFileName); ExLobDesc desc; - err = getDesc(desc); + NABoolean multipleChunks = FALSE; + err = getDesc(desc,handleIn,handleInLen,blackBox, &blackBoxLen,handleOut,handleOutLen,transId); if (err != LOB_OPER_OK) return err; - if (getRequest()->getBlackBoxLen() == -1) // mxlobsrvr returned -1 indicating multiple chunks for this particular lob handle + if (blackBoxLen == -1) // mxlobsrvr returned -1 indicating multiple chunks for this particular lob handle { + multipleChunks = TRUE; //the data retrieval in chunks is handled in readDataToMem. } else if (tgtLength <=0 ) @@ -797,19 +826,19 @@ Ex_Lob_Error ExLob::readToFile(char *tgtFileName, Int64 tgtLength, Int64 &operLe } if (tgtType == HDFS_FILE) { - err = readDataToHdfsFile(tgtFileName, srcOffset , tgtLength,operLen, lobMaxChunkMemLen, fileflags); + err = readDataToHdfsFile(tgtFileName, srcOffset , tgtLength,operLen, lobMaxChunkMemLen, fileflags,handleIn,handleInLen,multipleChunks,transId); if (err != LOB_OPER_OK) return err; } else if(tgtType == CURL_FILE) { - err = readDataToExternalFile(tgtFileName, srcOffset, tgtLength, operLen, lobMaxChunkMemLen, fileflags); + err = readDataToExternalFile(tgtFileName, srcOffset, tgtLength, operLen, lobMaxChunkMemLen, fileflags,handleIn, handleInLen,multipleChunks,transId); if (err != LOB_OPER_OK) return err; } else if (tgtType == LOCAL_FILE) { - err = readDataToLocalFile(tgtFileName,srcOffset, tgtLength,operLen, lobMaxChunkMemLen, fileflags); + err = readDataToLocalFile(tgtFileName,srcOffset, tgtLength,operLen, lobMaxChunkMemLen, fileflags,handleIn,handleInLen,multipleChunks,transId); if (err != LOB_OPER_OK) return err; } @@ -819,12 +848,15 @@ Ex_Lob_Error ExLob::readToFile(char *tgtFileName, Int64 tgtLength, Int64 &operLe return LOB_OPER_OK; } -Ex_Lob_Error ExLob::append(char *data, Int64 size, LobsSubOper so, Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize,Int64 lobMaxChunkMemSize) +Ex_Lob_Error ExLob::append(char *data, Int64 size, LobsSubOper so, Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize,Int64 lobMaxChunkMemSize, Int64 lobGCLimit, char *handleIn, Int32 handleInLen, char * handleOut, Int32 &handleOutLen,void *lobGlobals) { Ex_Lob_Error err = LOB_OPER_OK; Int64 dummyParam; Int64 dataOffset=0; Int64 sourceLen = size; + Int32 clierr = 0; + Int32 chunkNum = 0; + Int64 outDescPartnKey, outDescSyskey = 0; if (so == Lob_File) { err = statSourceFile(data, sourceLen); @@ -835,22 +867,21 @@ Ex_Lob_Error ExLob::append(char *data, Int64 size, LobsSubOper so, Int64 headDes { return LOB_MAX_LIMIT_ERROR; //exceeded the size of the max lob size } - err = allocateDesc((unsigned int)sourceLen, dummyParam, dataOffset, lobMaxSize); + err = allocateDesc((unsigned int)sourceLen, dummyParam, dataOffset, lobMaxSize,lobMaxChunkMemSize,handleIn, handleInLen,lobGCLimit,lobGlobals); if (err != LOB_OPER_OK) return err; - request_.setType(Lob_Req_Append); - request_.getDesc().setSize(sourceLen); - request_.setDataOffset(dataOffset); - request_.send(); - - err = request_.getError(); - - if (err != LOB_OPER_OK) { - return err; - } - - int cliErr = request_.getCliError(); - if (cliErr < 0 || cliErr == 100) { // some error or EOD. + + clierr = SQL_EXEC_LOBcliInterface(handleIn, handleInLen, + 0, &chunkNum, + handleOut, &handleOutLen, + LOB_CLI_INSERT_APPEND, LOB_CLI_ExecImmed, + &dataOffset, &sourceLen, + &outDescPartnKey, &outDescSyskey, + 0, + 0); + + + if (clierr < 0 || clierr == 100) { // some error or EOD. return LOB_DESC_APPEND_ERROR; } @@ -864,20 +895,20 @@ Ex_Lob_Error ExLob::append(char *data, Int64 size, LobsSubOper so, Int64 headDes return err; return LOB_OPER_OK; } -Ex_Lob_Error ExLob::insertData(char *data, Int64 size, LobsSubOper so,Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemSize) +Ex_Lob_Error ExLob::insertData(char *data, Int64 size, LobsSubOper so,Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemSize,char * handleIn, Int32 handleInLen, char *blackBox, Int32 blackBoxLen, char * handleOut, Int32 &handleOutLen, void *lobGlobals) { - Ex_Lob_Error err; + Ex_Lob_Error err=LOB_OPER_OK; ExLobDesc desc; - + int clierr = 0; operLen = 0; // get offset and input size from desc (the one that was just inserted into the descriptor handle table) - err = getDesc(desc); - if (err != LOB_OPER_OK) - return err; + + err = getDesc(desc,handleIn,handleInLen,blackBox, &blackBoxLen,handleOut,handleOutLen,0); + - int cliErr = request_.getCliError(); - if (cliErr < 0 || cliErr == 100) { // some error or EOD. + + if (err !=LOB_OPER_OK) { // some error or EOD. return LOB_DESC_READ_ERROR; } @@ -898,12 +929,15 @@ Ex_Lob_Error ExLob::insertData(char *data, Int64 size, LobsSubOper so,Int64 head return err; return LOB_OPER_OK; } -Ex_Lob_Error ExLob::update(char *data, Int64 size, LobsSubOper so,Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemSize) +Ex_Lob_Error ExLob::update(char *data, Int64 size, LobsSubOper so,Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemSize, Int64 lobGCLimit, char *handleIn, Int32 handleInLen, char *handleOut, Int32 &handleOutLen, void *lobGlobals) { Ex_Lob_Error err = LOB_OPER_OK; Int64 dummyParam; Int64 dataOffset = 0; Int64 sourceLen = size; + Int32 clierr = 0; + Int64 outDescPartnKey,outDescSyskey = 0; + Int32 chunkNum = 0; if (so == Lob_File) { err = statSourceFile(data, sourceLen); @@ -914,24 +948,21 @@ Ex_Lob_Error ExLob::update(char *data, Int64 size, LobsSubOper so,Int64 headDesc { return LOB_MAX_LIMIT_ERROR; //exceeded the size of the max lob size } - err = allocateDesc((unsigned int)sourceLen, dummyParam, dataOffset, lobMaxSize); + err = allocateDesc((unsigned int)sourceLen, dummyParam, dataOffset, lobMaxSize, lobMaxChunkMemSize, handleIn, handleInLen, lobGCLimit,lobGlobals); if (err != LOB_OPER_OK) return err; - // send a message to mxlobsrvr to do an update into descriptor tables - request_.setType(Lob_Req_Update); - request_.getDesc().setSize(sourceLen); - request_.setDataOffset(dataOffset); - - request_.send(); - - err = request_.getError(); - - if (err != LOB_OPER_OK) { - return err; - } - - int cliErr = request_.getCliError(); - if (cliErr < 0 || cliErr == 100) { // some error or EOD. + + clierr = SQL_EXEC_LOBcliInterface(handleIn, + handleInLen, + 0, &chunkNum, + handleOut, &handleOutLen, + LOB_CLI_UPDATE_UNIQUE, LOB_CLI_ExecImmed, + &dataOffset, &sourceLen, + &outDescPartnKey, &outDescSyskey, + 0, + 0); + + if (clierr < 0 || clierr == 100) { // some error or EOD. return LOB_DESC_UPDATE_ERROR; } char *inputAddr = data; @@ -946,18 +977,27 @@ Ex_Lob_Error ExLob::update(char *data, Int64 size, LobsSubOper so,Int64 headDesc return LOB_OPER_OK; } -Ex_Lob_Error ExLob::delDesc() +Ex_Lob_Error ExLob::delDesc(char *handleIn, Int32 handleInLen, Int64 transId) { - Ex_Lob_Error err; - Int64 dummyParam; - - request_.setType(Lob_Req_Del_Desc); + Ex_Lob_Error err; + Int64 offset=0; + Int64 dummyParam=0; + Lng32 clierr=0; - request_.send(); - - err = request_.getError(); + clierr = SQL_EXEC_LOBcliInterface(handleIn, handleInLen, + 0, 0, + (char *)&dummyParam, (Lng32 *)&dummyParam, + LOB_CLI_DELETE, LOB_CLI_ExecImmed, + &dummyParam, &dummyParam, + &dummyParam, &dummyParam, + 0, + transId); - return err; + if (clierr < 0) + return LOB_DESC_FILE_DELETE_ERROR; + + return LOB_OPER_OK; + } Ex_Lob_Error ExLob::purgeLob() @@ -967,38 +1007,37 @@ Ex_Lob_Error ExLob::purgeLob() { return LOB_DATA_FILE_DELETE_ERROR; } - if (hdfsDelete(fs_, lobDescFile_, 0) != 0) - { - return LOB_DESC_FILE_DELETE_ERROR; - } - - + return LOB_OPER_OK; } -Ex_Lob_Error ExLob::openCursor(char *handleIn, Int64 handleInLen) +Ex_Lob_Error ExLob::openCursor(char *handleIn, Int32 handleInLen,Int64 transId) { Ex_Lob_Error err; cursor_t cursor; + Int32 clierr; + Int64 dummyParam = 0; + void *cliInterface = NULL; + + clierr = SQL_EXEC_LOBcliInterface(handleIn, + handleInLen, + 0, 0, + (char *)&dummyParam, (Lng32 *)&dummyParam, + LOB_CLI_SELECT_CURSOR, LOB_CLI_ExecImmed, + &dummyParam, &dummyParam, + &dummyParam, &dummyParam, + &cliInterface, + transId); - request_.setType(Lob_Req_Select_Cursor); - - err = request_.send(); - - if (err != LOB_OPER_OK) { - return err; + if (clierr <0 ) { + return LOB_DESC_READ_ERROR; } - err = request_.getError(); - - if (err != LOB_OPER_OK) { - return err; - } cursor.bytesRead_ = -1; cursor.descOffset_ = -1; cursor.descSize_ = -1; - cursor.cliInterface_ = NULL; // used only in lob process + cursor.cliInterface_ = cliInterface; // used only in lob process cursor.eod_ = false; cursor.eor_ = false; cursor.eol_ = false; @@ -1101,7 +1140,7 @@ Ex_Lob_Error ExLob::openDataCursor(char *file, LobsCursorType type, Int64 range, return LOB_OPER_OK; } -Ex_Lob_Error ExLob::readCursor(char *tgt, Int64 tgtSize, char *handleIn, Int64 handleInLen, Int64 &operLen) +Ex_Lob_Error ExLob::readCursor(char *tgt, Int64 tgtSize, char *handleIn, Int32 handleInLen, Int64 &operLen,Int64 transId) { int dataOffset; Ex_Lob_Error result; @@ -1127,7 +1166,7 @@ Ex_Lob_Error ExLob::readCursor(char *tgt, Int64 tgtSize, char *handleIn, Int64 h return LOB_OPER_OK; } - result = readCursorData(tgt, tgtSize, cursor, operLen); // increments cursor + result = readCursorData(tgt, tgtSize, cursor, operLen, handleIn,handleInLen,transId); // increments cursor if (result != LOB_OPER_OK) return result; @@ -1137,221 +1176,38 @@ Ex_Lob_Error ExLob::readCursor(char *tgt, Int64 tgtSize, char *handleIn, Int64 h return LOB_OPER_OK; } -Ex_Lob_Error ExLob::readDataCursorSimple(char *file, char *tgt, Int64 tgtSize, - Int64 &operLen, ExLobGlobals *lobGlobals) -{ - int dataOffset; - Ex_Lob_Error result = LOB_OPER_OK; - cursor_t *cursor; - ExLobCursor::bufferList_t::iterator c_it; - ExLobCursorBuffer *buf = NULL; - Int64 bytesToCopy = 0; - operLen = 0; - Int64 len; - char *target = tgt; - bool done = false; - - struct timespec startTime; - struct timespec endTime; - lobCursorLock_.lock(); - lobCursors_it it = lobCursors_.find(string(file, strlen(file))); - if (it == lobCursors_.end()) +Ex_Lob_Error ExLob::closeCursor(char *handleIn, Int32 handleInLen) +{ + lobCursors_it it = lobCursors_.find(string(handleIn, handleInLen)); + if (it != lobCursors_.end()) { - lobCursorLock_.unlock(); - return LOB_CURSOR_NOT_OPEN; + lobCursors_.erase(it); } - else - { - cursor = &(it->second); - } + return LOB_OPER_OK; +} - lobCursorLock_.unlock(); - while ((operLen < tgtSize) && !done && !cursor->eol_) - { - lobGlobals->traceMessage("locking cursor",cursor,__LINE__); - cursor->lock_.lock(); +Ex_Lob_Error ExLob::doSanityChecks(char *dir, LobsStorage storage, + Int32 handleInLen, Int32 handleOutLen, + Int32 blackBoxLen) +{ - // if no buffers to read and is eor or eod, we are done. - // else wait for prefetch thread to wake us up. - if (cursor->prefetchBufList_.size() == 0) { - if (cursor->eor_ || cursor->eod_) { - done = true; - } else { - cursor->bufferMisses_++; - lobGlobals->traceMessage("wait on condition cursor",cursor,__LINE__); - cursor->lock_.wait(); - } - lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__); - cursor->lock_.unlock(); - continue; - } +#ifdef SQ_USE_HDFS + if (!fs_) + return LOB_HDFS_CONNECT_ERROR; +#else + if (fdData_ == -1) + return LOB_DATA_FILE_OPEN_ERROR; +#endif - // a buffer is available - c_it = cursor->prefetchBufList_.begin(); - buf = *c_it; - lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__); - cursor->lock_.unlock(); + if (dir_.compare(dir) != 0) + return LOB_DIR_NAME_ERROR; - bytesToCopy = min(buf->bytesRemaining_, tgtSize - operLen); - memcpy(target, buf->data_ + buf->bytesUsed_, bytesToCopy); - target += bytesToCopy; - if (bytesToCopy == buf->bytesRemaining_) { // buffer is now empty - buf->bytesRemaining_ = -1; - buf->bytesUsed_ = -1; - lobGlobals->postfetchBufListLock_.lock(); - lobGlobals->postfetchBufList_.push_back(buf); - lobGlobals->postfetchBufListLock_.unlock(); - lobGlobals->traceMessage("locking cursor",cursor,__LINE__); - cursor->lock_.lock(); - c_it = cursor->prefetchBufList_.erase(c_it); - lobGlobals->traceMessage("signal condition cursor",cursor,__LINE__); - cursor->lock_.wakeOne(); // wake up prefetch thread if it was waiting for an empty buffer. - lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__); - cursor->lock_.unlock(); - } else { - buf->bytesUsed_ += bytesToCopy; - buf->bytesRemaining_ -= bytesToCopy; - } - stats_.bytesPrefetched += bytesToCopy; - operLen += bytesToCopy; - } - - // update stats - stats_.bytesRead += operLen; - stats_.bytesToRead += tgtSize; - stats_.numReadReqs++; - - return LOB_OPER_OK; -} - -void ExLobCursor::emptyPrefetchList(ExLobGlobals *lobGlobals) -{ - ExLobCursor::bufferList_t::iterator c_it; - ExLobCursorBuffer *buf = NULL; - - c_it = prefetchBufList_.begin(); - while (c_it != prefetchBufList_.end()) - { - buf = *c_it; - lobGlobals->postfetchBufListLock_.lock(); - lobGlobals->postfetchBufList_.push_back(buf); - lobGlobals->postfetchBufListLock_.unlock(); - c_it = prefetchBufList_.erase(c_it); - } -} - -// Seems like this is currently unused. -// closeDataCusrorSimple takes care of destroying the cursor.But addign code -// similar to closeDataCursorSimple for correctness in case it is used in future -Ex_Lob_Error ExLob::deleteCursor(char *cursorName, ExLobGlobals *lobGlobals) -{ - cursor_t *cursor = NULL; - - lobCursorLock_.lock(); - - lobCursors_it it = lobCursors_.find(string(cursorName, strlen(cursorName))); - if (it != lobCursors_.end()) - { - cursor = &(it->second); - lobGlobals->traceMessage("locking cursor",cursor,__LINE__); - cursor->lock_.lock(); - cursor->emptyPrefetchList(lobGlobals); - lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__); - cursor->lock_.unlock(); - lobCursors_.erase(it); - } - - lobCursorLock_.unlock(); - - return LOB_OPER_OK; -} - -Ex_Lob_Error ExLob::closeCursor(char *handleIn, Int64 handleInLen) -{ - lobCursors_it it = lobCursors_.find(string(handleIn, handleInLen)); - if (it != lobCursors_.end()) - { - lobCursors_.erase(it); - } - return LOB_OPER_OK; -} - -Ex_Lob_Error ExLob::closeDataCursorSimple(char *fileName, ExLobGlobals *lobGlobals) -{ - cursor_t *cursor = NULL; - Int64 secs = 0; - Int64 nsecs = 0; - - lobCursorLock_.lock(); - - lobCursors_it it = lobCursors_.find(string(fileName, strlen(fileName))); - if (it != lobCursors_.end()) - { - cursor = &(it->second); - lobGlobals->traceMessage("locking cursor",cursor,__LINE__); - cursor->lock_.lock(); - - clock_gettime(CLOCK_MONOTONIC, &cursor->closeTime_); - secs = cursor->closeTime_.tv_sec - cursor->openTime_.tv_sec; - nsecs = cursor->closeTime_.tv_nsec - cursor->openTime_.tv_nsec; - - if (cursor->eod_ || cursor->eor_) { // prefetch thread already done, - cursor->emptyPrefetchList(lobGlobals); - lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__); - cursor->lock_.unlock(); - lobCursors_.erase(it); // so erase it here. - // no need to unlock as cursor object is gone. - } else { - cursor->eol_ = true; // prefetch thread will do the eol rituals - lobGlobals->traceMessage("signal condition cursor",cursor,__LINE__); - cursor->lock_.wakeOne(); // wakeup prefetch thread - lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__); - cursor->lock_.unlock(); - } - } - - lobCursorLock_.unlock(); - - if (nsecs < 0) { - secs--; - nsecs += NUM_NSECS_IN_SEC; - } - Int64 totalnsecs = (secs * NUM_NSECS_IN_SEC) + nsecs; - stats_.cursorElapsedTime += totalnsecs; - - return LOB_OPER_OK; -} - -Ex_Lob_Error ExLob::print() -{ - Ex_Lob_Error err; - request_.setType(Lob_Req_Print); - err = request_.send(); - return err; -} - -Ex_Lob_Error ExLob::doSanityChecks(char *dir, LobsStorage storage, - Int64 handleInLen, Int64 handleOutLen, - Int64 blackBoxLen) -{ - -#ifdef SQ_USE_HDFS - if (!fs_) - return LOB_HDFS_CONNECT_ERROR; -#else - if (fdData_ == -1) - return LOB_DATA_FILE_OPEN_ERROR; -#endif - - if (dir_.compare(dir) != 0) - return LOB_DIR_NAME_ERROR; - - if (storage_ != storage) - return LOB_STORAGE_TYPE_ERROR; + if (storage_ != storage) + return LOB_STORAGE_TYPE_ERROR; if (handleInLen > MAX_HANDLE_IN_LEN) { return LOB_HANDLE_IN_LEN_ERROR; @@ -1368,75 +1224,199 @@ Ex_Lob_Error ExLob::doSanityChecks(char *dir, LobsStorage storage, return LOB_OPER_OK; } - - -Ex_Lob_Error ExLob::allocateDesc(ULng32 size, Int64 &descNum, Int64 &dataOffset, Int64 lobMaxSize) +Ex_Lob_Error ExLob::allocateDesc(ULng32 size, Int64 &descNum, Int64 &dataOffset, Int64 lobMaxSize, Int64 lobMaxChunkMemLen, char *handleIn, Int32 handleInLen, Int64 lobGCLimit, void *lobGlobals) { + NABoolean GCDone = FALSE; Ex_Lob_Error err = LOB_OPER_OK; Lng32 retval = 0; Int64 numRead = 0; Int64 numWritten = 0; - - // TBD need a way to lock access to this file. + dataOffset = 0; + Int64 dummyParam = 0; + if (size > lobMaxSize) + return LOB_MAX_LIMIT_ERROR; Int32 openFlags = O_RDONLY ; - fdDesc_ = hdfsOpenFile(fs_, lobDescFile_, O_RDONLY, 0, 0,0); - if (!fdDesc_) { - hdfsCloseFile(fs_,fdDesc_); - fdDesc_ = NULL; - return LOB_DESC_FILE_OPEN_ERROR; - } - ExLobDescHeader header(lobMaxSize); - numRead = hdfsPread(fs_,fdDesc_, 0, (void *)&header, sizeof(ExLobDescHeader) ); - if (numRead <=0) - { - return LOB_DESC_HEADER_READ_ERROR; - } - if (header.getAvailSize() >= size) { - descNum = header.getFreeDesc(); - - dataOffset = header.getDataOffset(); - header.incFreeDesc(); - header.decAvailSize(size); - header.incDataOffset(size); - - hdfsCloseFile(fs_,fdDesc_); - fdDesc_ = NULL; - openFlags = O_WRONLY; - fdDesc_ = hdfsOpenFile(fs_,lobDescFile_,openFlags,0,0,0); - if (!fdDesc_) { - - return LOB_DESC_FILE_OPEN_ERROR; - } - numWritten = hdfsWrite(fs_,fdDesc_, (void *)&header, sizeof(ExLobDescHeader)) ; - if (numWritten <= 0) - { - return LOB_DESC_HEADER_WRITE_ERROR; + fdData_ = hdfsOpenFile(fs_, lobDataFile_, O_RDONLY, 0, 0,0); + if (!fdData_) { + hdfsCloseFile(fs_,fdData_); + fdData_ = NULL; + return LOB_DATA_FILE_OPEN_ERROR; } + hdfsFileInfo *fInfo = hdfsGetPathInfo(fs_, lobDataFile_); + if (fInfo) + dataOffset = fInfo->mSize; + + if (dataOffset > lobGCLimit) // 5 GB default + { + GCDone = TRUE; + /* Int32 rc = SQL_EXEC_LOBcliInterface(handleIn, handleInLen, + 0,0, + (char *)dummyParam, (Lng32 *)&dummyParam, + LOB_CLI_PERFORM_LOB_GC, LOB_CLI_ExecImmed, + &dummyParam, &dummyParam, + &dummyParam, &dummyParam, + NULL, + 0); //don't pass transid */ + Int32 rc = SQL_EXEC_LOB_GC_Interface(lobGlobals,handleIn,handleInLen, + hdfsServer_,hdfsPort_, + lobLocation_, + lobMaxChunkMemLen); + + } + if (GCDone) // recalculate the new offset + fInfo = hdfsGetPathInfo(fs_, lobDataFile_); + if (fInfo) + dataOffset = fInfo->mSize; - + + //Find the last offset in the file + // dataOffset = hdfsTell(fs_,fdData_); //commenting out.hdfsTell always returns 0 !! + + return LOB_OPER_OK; +} +Ex_Lob_Error ExLob::compactLobDataFile(ExLobInMemoryDescChunksEntry *dcArray,Int32 numEntries) +{ + Ex_Lob_Error rc = LOB_OPER_OK; + Int64 maxMemChunk = 1024*1024*1024; //1GB limit for intermediate buffer for transfering data + char * saveLobDataFile = new(getLobGlobalHeap()) char[MAX_LOB_FILE_NAME_LEN+6]; + str_sprintf(saveLobDataFile, "%s_save",lobDataFile_); + char * tmpLobDataFile = new(getLobGlobalHeap()) char[MAX_LOB_FILE_NAME_LEN+5]; + str_sprintf(tmpLobDataFile, "%s_tmp",lobDataFile_); + + hdfsFS fs = hdfsConnect(hdfsServer_,hdfsPort_); + if (fs == NULL) + return LOB_DATA_FILE_OPEN_ERROR; + + + hdfsFile fdData = hdfsOpenFile(fs, lobDataFile_, O_RDONLY, 0, 0,0); + if (!fdData) + { + hdfsCloseFile(fs,fdData); + fdData = NULL; + return LOB_DATA_FILE_OPEN_ERROR; } - else { - return LOB_DATA_FILE_FULL_ERROR; + + hdfsFile fdTemp = hdfsOpenFile(fs, tmpLobDataFile,O_WRONLY|O_CREAT,0,0,0); + if (!fdTemp) + { + hdfsCloseFile(fs,fdTemp); + fdTemp = NULL; + return LOB_DATA_FILE_OPEN_ERROR; } - ExLobDesc desc(dataOffset, size, descNum); - hdfsCloseFile(fs_,fdDesc_); - fdDesc_=NULL; - openFlags = O_WRONLY| O_APPEND; - fdDesc_ = hdfsOpenFile(fs_,lobDescFile_,openFlags,0,0,0); - numWritten = hdfsWrite(fs_,fdDesc_, (void *)&desc, sizeof(ExLobDesc)); - if (numWritten <= 0) - { - err = LOB_DESC_WRITE_ERROR; - } - hdfsCloseFile(fs_,fdDesc_); - fdDesc_=NULL; - - // TBD need a way to unlock this hdfs file. - return err; + Int32 i = 0; + Int64 bytesRead = 0; + Int64 bytesWritten = 0; + Int64 size = 0; + Int64 chunkLen = 0; + char * tgt = NULL; + while (i < numEntries) + { + chunkLen = dcArray[i].getChunkLen(); + if (chunkLen > maxMemChunk) + { + tgt = (char *)(getLobGlobalHeap())->allocateMemory(maxMemChunk); + while (chunkLen > maxMemChunk) + { + bytesRead = hdfsPread(fs,fdData,dcArray[i].getCurrentOffset(),tgt,maxMemChunk); + if (bytesRead != maxMemChunk) + { + getLobGlobalHeap()->deallocateMemory(tgt); + return LOB_DATA_READ_ERROR; + } + bytesWritten = hdfsWrite(fs,fdTemp, tgt,maxMemChunk); + if (bytesWritten != size) + { + getLobGlobalHeap()->deallocateMemory(tgt); + return LOB_TARGET_FILE_WRITE_ERROR; + } + chunkLen -= maxMemChunk; + } + + } + else + { + tgt = (char *)(getLobGlobalHeap())->allocateMemory(chunkLen); + bytesRead = hdfsPread(fs,fdData,dcArray[i].getCurrentOffset(),tgt,chunkLen); + if (bytesRead != chunkLen) + { + getLobGlobalHeap()->deallocateMemory(tgt); + return LOB_DATA_READ_ERROR; + } + bytesWritten = hdfsWrite(fs,fdTemp, tgt,chunkLen); + if (bytesWritten != chunkLen) + { + getLobGlobalHeap()->deallocateMemory(tgt); + return LOB_TARGET_FILE_WRITE_ERROR; + } + } + if (hdfsFlush(fs, fdTemp)) { + return LOB_DATA_FLUSH_ERROR; + } + getLobGlobalHeap()->deallocateMemory(tgt); + i++; + } + hdfsCloseFile(fs,fdTemp); + hdfsCloseFile(fs,fdData); + + //Now save the data file and rename the tempfile to the original datafile + + Int32 rc2 = hdfsRename(fs,lobDataFile_,saveLobDataFile); + if (rc2 == -1) + { + NADELETEBASIC(saveLobDataFile,getLobGlobalHeap()); + NADELETEBASIC(tmpLobDataFile,getLobGlobalHeap()); + return LOB_DATA_FILE_WRITE_ERROR; + } + rc2 = hdfsRename(fs,tmpLobDataFile, lobDataFile_); + if (rc2 == -1) + { + NADELETEBASIC(saveLobDataFile,getLobGlobalHeap()); + NADELETEBASIC(tmpLobDataFile,getLobGlobalHeap()); + return LOB_DATA_FILE_WRITE_ERROR; + } + NADELETEBASIC(saveLobDataFile,getLobGlobalHeap()); + NADELETEBASIC(tmpLobDataFile,getLobGlobalHeap()); + return LOB_OPER_OK; } +Ex_Lob_Error ExLob::restoreLobDataFile() +{ + Ex_Lob_Error rc = LOB_OPER_OK; + + + hdfsFS fs = hdfsConnect(hdfsServer_,hdfsPort_); + if (fs == NULL) + return LOB_DATA_FILE_OPEN_ERROR; + char * saveLobDataFile = new(getLobGlobalHeap()) char[MAX_LOB_FILE_NAME_LEN+6]; + str_sprintf(saveLobDataFile, "%s_save",lobDataFile_); + Int32 rc2 = hdfsDelete(fs,lobDataFile_,FALSE);//ok to ignore error. + rc2 = hdfsRename(fs,saveLobDataFile, lobDataFile_); + if (rc2) + { + NADELETEBASIC(saveLobDataFile,getLobGlobalHeap()); + return LOB_OPER_ERROR; + } + NADELETEBASIC(saveLobDataFile,getLobGlobalHeap()); + return rc; + +} + +Ex_Lob_Error ExLob::purgeBackupLobDataFile() +{ + Ex_Lob_Error rc = LOB_OPER_OK; + + hdfsFS fs = hdfsConnect(hdfsServer_,hdfsPort_); + if (fs == NULL) + return LOB_DATA_FILE_OPEN_ERROR; + char * saveLobDataFile = new(getLobGlobalHeap()) char[MAX_LOB_FILE_NAME_LEN+6]; + str_sprintf(saveLobDataFile, "%s_save",lobDataFile_); + Int32 rc2 = hdfsDelete(fs,saveLobDataFile,FALSE);//ok to ignore error. + + NADELETEBASIC(saveLobDataFile,getLobGlobalHeap()); + return rc; +} /////////////////////////////////////////////////////////////////////////////// // ExLobDescHeader definitions /////////////////////////////////////////////////////////////////////////////// @@ -1471,7 +1451,7 @@ ExLobDesc::~ExLobDesc() { } -Ex_Lob_Error ExLob::readCursorData(char *tgt, Int64 tgtSize, cursor_t &cursor, Int64 &operLen) +Ex_Lob_Error ExLob::readCursorData(char *tgt, Int64 tgtSize, cursor_t &cursor, Int64 &operLen, char *handleIn, Int32 handleLenIn, Int64 transId) { ExLobDesc desc; Ex_Lob_Error err; @@ -1482,23 +1462,26 @@ Ex_Lob_Error ExLob::readCursorData(char *tgt, Int64 tgtSize, cursor_t &cursor, I tOffset offset; struct timespec startTime; struct timespec endTime; + NABoolean isEOD=FALSE; + Int64 outOffset = 0; + Int64 outSize = 0; while ( (operLen < tgtSize) && !cursor.eod_ ) { if (cursor.bytesRead_ == cursor.descSize_) // time to read next chunck { - err = fetchCursor(); + err = fetchCursor(handleIn, handleLenIn,outOffset, outSize,isEOD,transId); if (err != LOB_OPER_OK) { return err; } - if (request_.getCliError() == 100) { + if (isEOD) { cursor.eod_ = true; // subsequent call will return 100 and close the cursor continue; } else { - cursor.descSize_ = request_.getDesc().getSize(); - cursor.descOffset_ = request_.getDesc().getOffset(); + cursor.descSize_ = outSize; + cursor.descOffset_ = outOffset; cursor.bytesRead_ = 0; } } @@ -1538,7 +1521,7 @@ Ex_Lob_Error ExLob::readCursorData(char *tgt, Int64 tgtSize, cursor_t &cursor, I if (bytesRead == -1) { return LOB_DATA_READ_ERROR; } else if (bytesRead == 0) { - cursor.eod_ = true; + cursor.eod_ = true; continue; } @@ -1552,92 +1535,29 @@ Ex_Lob_Error ExLob::readCursorData(char *tgt, Int64 tgtSize, cursor_t &cursor, I return LOB_OPER_OK; } -Ex_Lob_Error ExLob::readCursorDataSimple(char *tgt, Int64 tgtSize, cursor_t &cursor, Int64 &operLen) -{ - ExLobDesc desc; - Ex_Lob_Error err; - Int64 bytesAvailable = 0; - Int64 bytesToCopy = 0; - Int64 bytesRead = 0; - operLen = 0; - tOffset offset; - struct timespec startTime; - struct timespec endTime; - bool done = false; - - if (!fdData_) { - return LOB_CURSOR_NOT_OPEN_ERROR; - } - - if (cursor.bytesRead_ == -1) { // starting - cursor.bytesRead_ = 0; - } - - clock_gettime(CLOCK_MONOTONIC, &startTime); - - while ( (operLen < tgtSize) && !done ) - { - //offset = cursor.descOffset_ + cursor.bytesRead_; - bytesToCopy = tgtSize - operLen; - offset = cursor.descOffset_ + cursor.bytesRead_; - - // gets chunks of 64KB. Uses readDirect internally. - // bytesRead = hdfsPread(fs_, fdData_, offset, tgt, bytesToCopy); - bytesRead = hdfsRead(fs_, fdData_, tgt, bytesToCopy); - - stats_.numHdfsReqs++; - - if (bytesRead == -1) { - return LOB_DATA_READ_ERROR; - } else if (bytesRead == 0) { - done = true; - } - - cursor.bytesRead_ += bytesRead; - operLen += bytesRead; - tgt += bytesRead; - } - - clock_gettime(CLOCK_MONOTONIC, &endTime); - - Int64 secs = endTime.tv_sec - startTime.tv_sec; - Int64 nsecs = endTime.tv_nsec - startTime.tv_nsec; - if (nsecs < 0) { - secs--; - nsecs += NUM_NSECS_IN_SEC; - } - - Int64 totalnsecs = (secs * NUM_NSECS_IN_SEC) + nsecs; - stats_.CumulativeReadTime += totalnsecs; - return LOB_OPER_OK; -} Ex_Lob_Error ExLob::readDataToMem(char *memAddr, - Int64 offset, Int64 size, Int64 &operLen) + Int64 offset, Int64 size, Int64 &operLen, + char *handleIn, Int32 handleLenIn, + NABoolean multipleChunks, Int64 transId) { Ex_Lob_Error err = LOB_OPER_OK; operLen = 0; Int64 bytesRead = 0; - NABoolean multipleChunks = FALSE; - if (getRequest()->getBlackBoxLen() == -1) // mxlobsrvr returned -1 indicating multiple chunks for this particular lob handle + if (multipleChunks) { - multipleChunks = TRUE; - err = openCursor(getRequest()->getHandleIn(), - getRequest()->getHandleInLen()); + err = openCursor(handleIn, + handleLenIn,transId); //now we can fetch the descriptors for each chunk } - else - if (err != LOB_OPER_OK) - return err; - int cliErr = request_.getCliError(); - if (cliErr < 0 || cliErr == 100) { - return LOB_DESC_READ_ERROR; - } + if (err != LOB_OPER_OK) + return err; + if (fdData_)// we may have a stale handle. close and open to refresh { @@ -1665,22 +1585,19 @@ Ex_Lob_Error ExLob::readDataToMem(char *memAddr, memAddr, size)) == -1) { return LOB_DATA_READ_ERROR; - } - - + } operLen = bytesRead; return LOB_OPER_OK; } else { //handle reading the multiple chunks like a cursor - err = readCursor(memAddr,size, getRequest()->getHandleIn(), - getRequest()->getHandleInLen(), operLen); - - + err = readCursor(memAddr,size, handleIn, + handleLenIn, operLen, transId); + if (err==LOB_OPER_OK) - closeCursor(getRequest()->getHandleIn(), - getRequest()->getHandleInLen()); + closeCursor(handleIn, + handleLenIn); else return err; } @@ -1689,7 +1606,7 @@ Ex_Lob_Error ExLob::readDataToMem(char *memAddr, -Ex_Lob_Error ExLob::readDataToLocalFile(char *fileName, Int64 offset, Int64 size, Int64 &writeOperLen, Int64 lobMaxChunkMemSize, Int32 fileflags) +Ex_Lob_Error ExLob::readDataToLocalFile(char *fileName, Int64 offset, Int64 size, Int64 &writeOperLen, Int64 lobMaxChunkMemSize, Int32 fileflags,char *handleIn,Int32 handleInLen, NABoolean multipleChunks,Int64 transId) { Ex_Lob_Error err; Int64 operLen = 0; @@ -1699,10 +1616,10 @@ Ex_Lob_Error ExLob::readDataToLocalFile(char *fileName, Int64 offset, Int64 siz Int64 tgtOffset = 0; char *lobData = 0; Int64 chunkSize = 0; - + if (srcLen <=0) return LOB_SOURCE_DATA_ALLOC_ERROR; - // open the targte file for writing + // open the target file for writing int filePerms = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH; int openFlags = O_RDWR ; // O_DIRECT needs mem alignment if (((LobTgtFileFlags)fileflags == Lob_Append_Or_Error ) || @@ -1731,15 +1648,16 @@ Ex_Lob_Error ExLob::readDataToLocalFile(char *fileName, Int64 offset, Int64 siz return LOB_TARGET_FILE_OPEN_ERROR; } } - if ((srcLen < lobMaxChunkMemSize) && (getRequest()->getBlackBoxLen() != -1)) // simple single I/O case + if ((srcLen < lobMaxChunkMemSize) && (multipleChunks ==FALSE)) // simple single I/O case { + lobData = (char *) (getLobGlobalHeap())->allocateMemory(srcLen); if (lobData == NULL) { return LOB_SOURCE_DATA_ALLOC_ERROR; } - err = readDataToMem(lobData, srcOffset,srcLen,operLen); + err = readDataToMem(lobData, srcOffset,srcLen,operLen, handleIn,handleInLen, multipleChunks,transId); if (err != LOB_OPER_OK) { getLobGlobalHeap()->deallocateMemory(lobData); @@ -1756,8 +1674,8 @@ Ex_Lob_Error ExLob::readDataToLocalFile(char *fileName, Int64 offset, Int64 siz } else // multiple chunks to read { - err = openCursor(getRequest()->getHandleIn(), - getRequest()->getHandleInLen()); + err = openCursor(handleIn, + handleInLen,transId); if (err != LOB_OPER_OK) return err; while ( srcLen > 0) @@ -1771,8 +1689,8 @@ Ex_Lob_Error ExLob::readDataToLocalFile(char *fileName, Int64 offset, Int64 siz return LOB_SOURCE_DATA_ALLOC_ERROR; } //handle reading the multiple chunks like a cursor - err = readCursor(lobData,chunkSize, getRequest()->getHandleIn(), - getRequest()->getHandleInLen(), operLen); + err = readCursor(lobData,chunkSize, handleIn, + handleInLen, operLen, transId); if ((err != LOB_OPER_OK) || (operLen != chunkSize)) { @@ -1790,15 +1708,15 @@ Ex_Lob_Error ExLob::readDataToLocalFile(char *fileName, Int64 offset, Int64 siz srcLen -= chunkSize; tgtOffset += chunkSize; } - closeCursor(getRequest()->getHandleIn(), - getRequest()->getHandleInLen()); + closeCursor(handleIn, + handleInLen); } close(fdDestFile); return LOB_OPER_OK; } -Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, Int64 offset, Int64 size, Int64 &writeOperLen, Int64 lobMaxChunkMemLen, Int32 fileflags) +Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, Int64 offset, Int64 size, Int64 &writeOperLen, Int64 lobMaxChunkMemLen, Int32 fileflags,char *handleIn, Int32 handleInLen, NABoolean multipleChunks,Int64 transId) { Ex_Lob_Error err; Int64 operLen = 0; @@ -1809,6 +1727,7 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, Int64 offset, Int64 s char *lobData = 0; Int64 chunkSize = 0; hdfsFile fdTgtFile; + // open and write to the target file int openFlags = O_WRONLY; if ((LobTgtFileFlags)fileflags == Lob_Append_Or_Error ) @@ -1844,7 +1763,7 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, Int64 offset, Int64 s } } - if ((srcLen < lobMaxChunkMemLen) && (getRequest()->getBlackBoxLen() != -1)) // simple single I/O case + if ((srcLen < lobMaxChunkMemLen) && (multipleChunks ==FALSE)) // simple single I/O case { lobData = (char *) (getLobGlobalHeap())->allocateMemory(srcLen); @@ -1852,7 +1771,7 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, Int64 offset, Int64 s { return LOB_SOURCE_DATA_ALLOC_ERROR; } - err = readDataToMem(lobData, srcOffset,srcLen,operLen); + err = readDataToMem(lobData, srcOffset,srcLen,operLen,handleIn,handleInLen, multipleChunks,transId); if (err != LOB_OPER_OK) { getLobGlobalHeap()->deallocateMemory(lobData); @@ -1874,8 +1793,9 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, Int64 offset, Int64 s } else {// multiple chunks to read - err = openCursor(getRequest()->getHandleIn(), - getRequest()->getHandleInLen()); + err = openCursor(handleIn, + handleInLen, + transId); if (err != LOB_OPER_OK) return err; while ( srcLen > 0) @@ -1888,13 +1808,13 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, Int64 offset, Int64 s return LOB_SOURCE_DATA_ALLOC_ERROR; } //handle reading the multiple chunks like a cursor - err = readCursor(lobData,chunkSize, getRequest()->getHandleIn(), - getRequest()->getHandleInLen(), operLen); + err = readCursor(lobData,chunkSize, handleIn, + handleInLen, operLen, transId); if ((err != LOB_OPER_OK) || (operLen != chunkSize)) { getLobGlobalHeap()->deallocateMemory(lobData); - return err; + return LOB_DATA_READ_ERROR; } writeOperLen += hdfsWrite(fs_,fdTgtFile,lobData, chunkSize); if (writeOperLen <= 0) @@ -1911,8 +1831,8 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, Int64 offset, Int64 s srcLen -= chunkSize; } - closeCursor(getRequest()->getHandleIn(), - getRequest()->getHandleInLen()); + closeCursor(handleIn, + handleInLen); } hdfsCloseFile(fs_, fdTgtFile); fdTgtFile=NULL; @@ -1925,7 +1845,7 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, Int64 offset, Int64 s -Ex_Lob_Error ExLob::readDataToExternalFile(char *tgtFileName, Int64 offset, Int64 size, Int64 &operLen,Int64 lobMaxChunkMemLen,Int32 fileflags) +Ex_Lob_Error ExLob::readDataToExternalFile(char *tgtFileName, Int64 offset, Int64 size, Int64 &operLen,Int64 lobMaxChunkMemLen,Int32 fileflags,char *handleIn,Int32 handleInLen, NABoolean multipleChunks,Int64 transId) { //TBD return LOB_OPER_OK; @@ -1954,975 +1874,1210 @@ Ex_Lob_Error ExLob::initStats() stats_.init(); return LOB_OPER_OK; } -void ExLobGlobals::traceMessage(const char *logMessage, ExLobCursor *cursor, - int line) -{ - if ( threadTraceFile_ && logMessage) - { - fprintf(threadTraceFile_, - "Thread: 0x%lx Line: %d %s 0x%lx\n" , - (unsigned long)pthread_self(), line, logMessage, - (unsigned long) cursor); - fflush(threadTraceFile_); - } - -} -Ex_Lob_Error ExLobGlobals::performRequest(ExLobHdfsRequest *request) -{ +//Main driver of any LOB related operation + +Ex_Lob_Error ExLobsOper ( + char *lobName, // lob name + char *handleIn, // input handle (for cli calls) + Int32 handleInLen, // input handle len + char *hdfsServer, // server where hdfs fs resides + Int64 hdfsPort, // port number to access hdfs server + char *handleOut, // output handle (for cli calls) + Int32 &handleOutLen, // output handle len + Int64 descNumIn, // input desc Num (for flat files only) + Int64 &descNumOut, // output desc Num (for flat files only) + Int64 &retOperLen, // length of data involved in this operation + Int64 requestTagIn, // only for checking status + Int64 &requestTagOut, // returned with every request other than check status + Ex_Lob_Error &requestStatus, // returned req status + Int64 &cliError, // err returned by cli call + char *dir, // directory in the storage + LobsStorage storage, // storage type + char *source, // source (memory addr, filename, foreign lob etc) + Int64 sourceLen, // source len (memory len, foreign desc offset etc) + Int64 cursorBytes, + char *cursorId, + LobsOper operation, // LOB operation + LobsSubOper subOperation, // LOB sub operation + Int64 waited, // waited or nowaited + void *&globPtr, // ptr to the Lob objects. + Int64 transId, + void *blackBox, // black box to be sent to cli + Int32 blackBoxLen, // length of black box + Int64 lobMaxSize, + Int64 lobMaxChunkMemSize, + Int64 lobGCLimit, + int bufferSize , + short replication , + int blockSize, + Lng32 openType) +{ Ex_Lob_Error err = LOB_OPER_OK; - ExLob *lobPtr; - ExLobCursorBuffer *buf; - ExLobCursor *cursor; - Int64 size; - NABoolean seenEOR = false; - NABoolean seenEOD = false; - ExLobCursor::bufferList_t::iterator c_it; - Int64 totalBufSize; + ExLob *lobPtr = NULL; + char fn[MAX_LOB_FILE_NAME_LEN]; + struct timespec startTime; + struct timespec endTime; + Int64 secs, nsecs, totalnsecs; + ExLobPreOpen *preOpenObj; + ExLobGlobals *lobGlobals = NULL; + transId = 0; + retOperLen = 0; + ExLobDesc desc; + + lobMap_t *lobMap = NULL; + lobMap_it it; - switch (request->reqType_) - { - case Lob_Hdfs_Cursor_Prefetch : - lobPtr = request->lobPtr_; - cursor = request->cursor_; - traceMessage("locking cursor",cursor,__LINE__); - cursor->lock_.lock(); - while (!cursor->eod_ && !cursor->eor_ && !cursor->eol_) - { - postfetchBufListLock_.lock(); - c_it = postfetchBufList_.begin(); - if (c_it != postfetchBufList_.end()) { - buf = *c_it; - postfetchBufList_.erase(c_it); - postfetchBufListLock_.unlock(); - traceMessage("unlocking cursor",cursor,__LINE__); - cursor->lock_.unlock(); - } else { - postfetchBufListLock_.unlock(); - // there are no empty buffers. - // if prefetch list already has the max, wait for one to free up. - totalBufSize = cursor->prefetchBufList_.size() * cursor->bufMaxSize_; - if (totalBufSize > LOB_CURSOR_PREFETCH_BYTES_MAX) { - traceMessage("wait on condition cursor",cursor,__LINE__); - cursor->lock_.wait(); - char buffer2[2048]; - sprintf(buffer2, "cursor->eod_ %d cursor->eor_ %d " - "cursor->eol_ %d", cursor->eod_, - cursor->eor_, cursor->eol_); - traceMessage(buffer2, cursor, __LINE__); - continue; - } - // create a new buffer - traceMessage("unlocking cursor",cursor,__LINE__); - cursor->lock_.unlock(); - buf = new (getHeap()) ExLobCursorBuffer(); - buf->data_ = (char *) (getHeap())->allocateMemory( cursor->bufMaxSize_); - lobPtr->stats_.buffersUsed++; - } - size = min(cursor->bufMaxSize_, (cursor->maxBytes_ - cursor->bytesRead_ + (16 * 1024))); - if (buf->data_) { - lobPtr->readCursorDataSimple(buf->data_, size, *cursor, buf->bytesRemaining_); - buf->bytesUsed_ = 0; - traceMessage("locking cursor",cursor,__LINE__); - cursor->lock_.lock(); - if (size < (cursor->bufMaxSize_)) { - cursor->eor_ = true; - seenEOR = true; - } - if (buf->bytesRemaining_) { - cursor->prefetchBufList_.push_back(buf); - traceMessage("signal condition cursor",cursor,__LINE__); - cursor->lock_.wakeOne(); - traceMessage("unlocking cursor",cursor,__LINE__); - cursor->lock_.unlock(); - } else { - cursor->eod_ = true; - seenEOD = true; - traceMessage("signal condition cursor",cursor,__LINE__); - cursor->lock_.wakeOne(); - traceMessage("unlocking cursor",cursor,__LINE__); - cursor->lock_.unlock(); - postfetchBufListLock_.lock(); - postfetchBufList_.push_back(buf); - postfetchBufListLock_.unlock(); - } - } else { - assert("data_ is null"); - } - // Important! Break and do not access cursor object if we have reached - // end of data or range. - // The main thread could have destroyed the cursor - // in ::closeDataCursorSimple - if (seenEOD || seenEOR) - { - char buffer2[2048]; - sprintf(buffer2, "seenEOD %d seenEOR %d", - seenEOD, seenEOR); - traceMessage(buffer2, cursor, __LINE__); - break; - } - traceMessage("locking cursor",cursor,__LINE__); - cursor->lock_.lock(); - } // while + clock_gettime(CLOCK_MONOTONIC, &startTime); - if (!seenEOD && !seenEOR) + char *fileName = lobName; + + if (globPtr == NULL) + { + if (operation == Lob_Init) { - traceMessage("locking cursor",cursor,__LINE__); - cursor->lock_.unlock(); - if (cursor->eol_) { // never reaches here ?? - lobPtr->deleteCursor(cursor->name_, this); - } + globPtr = (void *) new ExLobGlobals(); + if (globPtr == NULL) + return LOB_INIT_ERROR; + + lobGlobals = (ExLobGlobals *)globPtr; + + err = lobGlobals->initialize(); + return err; } - processPreOpens(); - break; + else + { + return LOB_GLOB_PTR_ERROR; + } + } + else + { + lobGlobals = (ExLobGlobals *)globPtr; - default: - request->error_ = LOB_HDFS_REQUEST_UNKNOWN; - } + lobMap = lobGlobals->getLobMap(); - return LOB_OPER_OK; -} + it = lobMap->find(string(fileName)); -Ex_Lob_Error ExLobDesc::print() -{ - printf("%4d %4d %4d %4d %4d %4d %8d\n", - dataSize_, dataState_, tail_, prev_, next_, nextFree_, dataOffset_); - return LOB_OPER_OK; -} + if (it == lobMap->end()) + { + //lobPtr = new (lobGlobals->getHeap())ExLob(); + lobPtr = new ExLob(); + if (lobPtr == NULL) + return LOB_ALLOC_ERROR; -/////////////////////////////////////////////////////////////////////////////// -// ExLobGlobals definitions -/////////////////////////////////////////////////////////////////////////////// + err = lobPtr->initialize(fileName, (operation == Lob_Create) ? EX_LOB_CREATE : EX_LOB_RW, dir, storage, hdfsServer, hdfsPort, dir,bufferSize, replication, blockSize,lobMaxSize,lobGlobals); + if (err != LOB_OPER_OK) + return err; -ExLobGlobals::ExLobGlobals() : - lobMap_(NULL), - fs_(NULL), - isCliInitialized_(FALSE), - isHive_(FALSE), - threadTraceFile_(NULL), - heap_(NULL) -{ - //initialize the log file - if (getenv("TRACE_HDFS_THREAD_ACTIONS")) + lobMap->insert(pair<string, ExLob*>(string(fileName), lobPtr)); + } + else + { + lobPtr = it->second; + + } + } + /* +// **Note** This is code that needs to get called before sneding a request to the +//mxlobsrvr process. It's inactive code currently + MS_Mon_Transid_Type transIdBig; + MS_Mon_Transseq_Type transStartId; + if (!lobGlobals->isHive()) { - char logFileName[50]= ""; - sprintf(logFileName,"trace_threads.%d",getpid()); - threadTraceFile_ = fopen(logFileName,"a"); + // get current transaction + + int transIdErr = ms_transid_get(false, false, &transIdBig, &transStartId); + // set the pass thru request object values in the lob + + lobPtr->getRequest()->setValues(lobPtr->getDescFileName(), + descNumIn, handleInLen, handleIn, storage, + transId, transIdBig, transStartId, + (char *)blackBox, blackBoxLen); } -} + */ + switch(operation) + { + case Lob_Create: + break; -ExLobGlobals::~ExLobGlobals() -{ - ExLobCursor::bufferList_t::iterator c_it; - ExLobCursorBuffer *buf = NULL; + case Lob_InsertDesc: + err = lobPtr->writeDesc(sourceLen, source, subOperation, descNumOut, retOperLen, lobMaxSize, lobMaxChunkMemSize,lobGCLimit,handleIn,handleInLen,(char *)blackBox, &blackBoxLen,handleOut,handleOutLen,lobGlobals); + break; - preOpenListLock_.lock(); - preOpenList_.clear(); - preOpenListLock_.unlock(); + case Lob_InsertData: + err = lobPtr->insertData(source, sourceLen, subOperation, descNumIn, retOperLen, lobMaxSize,lobMaxChunkMemSize,handleIn,handleInLen,(char *)blackBox, blackBoxLen,handleOut,handleOutLen,lobGlobals); + break; - - if (lobMap_) - delete lobMap_; + case Lob_InsertDataSimple: + err = lobPtr->writeDataSimple(source, sourceLen, subOperation, retOperLen, + bufferSize , replication , blockSize); + break; - for (int i=0; i<NUM_WORKER_THREADS; i++) { - enqueueShutdownRequest(); - } + case Lob_Read: + if (subOperation == Lob_Memory) + err = lobPtr->readToMem(source,sourceLen,retOperLen,handleIn,handleInLen,(char *)blackBox, blackBoxLen,handleOut,handleOutLen,transId); + else if (subOperation == Lob_File) + err = lobPtr->readToFile(source, sourceLen, retOperLen, lobMaxChunkMemSize, openType,handleIn,handleInLen,(char *)blackBox, blackBoxLen,handleOut,handleOutLen,transId); + else + err = LOB_SUBOPER_ERROR; + break; - for (int i=0; i<NUM_WORKER_THREADS; i++) { - pthread_join(threadId_[i], NULL); - } - // Free the post fetch bugf list AFTER the worker threads have left to - // avoid slow worker thread being stuck and master deallocating these - // buffers and not consuming the buffers which could cause a lock. - - postfetchBufListLock_.lock(); - c_it = postfetchBufList_.begin(); - while (c_it != postfetchBufList_.end()) { - buf = *c_it; - if (buf->data_) { - heap_->deallocateMemory( buf->data_); - } - c_it = postfetchBufList_.erase(c_it); - } - postfetchBufListLock_.unlock(); - - //msg_mon_close_process(&serverPhandle); - if (threadTraceFile_) - fclose(threadTraceFile_); - threadTraceFile_ = NULL; -} + case Lob_ReadDesc: // read desc only. Needed for pass thru. + err = lobPtr->getDesc(desc,handleIn,handleInLen,(char *)blackBox, &blackBoxLen,handleOut,handleOutLen,transId); + retOperLen = 0; + break; + case Lob_OpenCursor: + err = lobPtr->openCursor(handleIn, handleInLen,transId); + break; -Ex_Lob_Error ExLobGlobals::setServerPhandle() -{ - int nid; - - int err = msg_mon_get_my_info(&nid, NULL, NULL, NULL, NULL, NULL, NULL, NULL); - char server[12]; - sprintf(server, "%s%d", "$ZLOBSRV", nid); + case Lob_OpenDataCursorSimple: + if (openType == 1) { // preopen + sprintf(fn,"%s:%Lx:%s",lobPtr->getDataFileName(), (long long unsigned int)lobName, cursorId); + preOpenObj = new (lobGlobals->getHeap()) ExLobPreOpen(lobPtr, fn, descNumIn, sourceLen, cursorBytes, waited); + lobGlobals->addToPreOpenList(preOpenObj); + } else if (openType == 2) { // must open + sprintf(fn,"%s:%Lx:%s",lobPtr->getDataFileName(), (long long unsigned int)lobName, cursorId); + fileName = fn; + err = lobPtr->openDataCursor(fileName, Lob_Cursor_Simple, descNumIn, sourceLen, cursorBytes, waited, lobGlobals); + } else + err = LOB_SUBOPER_ERROR; + break; - int oid; - err = msg_mon_open_process(server, &serverPhandle, &oid); + case Lob_ReadCursor: + if ((subOperation == Lob_Memory) || (subOperation == Lob_Buffer)) + err = lobPtr->readCursor(source, sourceLen, handleIn, handleInLen, retOperLen,transId); + else if (subOperation == Lob_File) + err = lobPtr->readCursor(source, -1, handleIn, handleInLen, retOperLen,transId); + else + err = LOB_SUBOPER_ERROR; + break; - if (err != XZFIL_ERR_OK) - return LOB_SERVER_OPEN_ERROR; + case Lob_ReadDataCursorSimple: + sprintf(fn,"%s:%Lx:%s",lobPtr->getDataFileName(), (long long unsigned int)lobName, cursorId); + fileName = fn; + err = lobPtr->readDataCursorSimple(fileName, source, sourceLen, retOperLen, lobGlobals); + break; - return LOB_OPER_OK; -} + case Lob_CloseFile: + if (lobPtr->hasNoOpenCursors()) { + lobGlobals->traceMessage("Lob_CloseFile",NULL,__LINE__); + err = lobPtr->closeFile(); + it = lobMap->find(string(lobName)); + lobMap->erase(it); + delete lobPtr; + lobPtr = NULL; + } + break; -Ex_Lob_Error ExLobGlobals::resetServerPhandle() -{ - Ex_Lob_Error err; + case Lob_CloseCursor: + err = lobPtr->closeCursor(handleIn, handleInLen); + break; - msg_mon_close_process(&serverPhandle); + case Lob_CloseDataCursorSimple: + sprintf(fn,"%s:%Lx:%s",lobPtr->getDataFileName(), (long long unsigned int)lobName, cursorId); + fileName = fn; + err = lobPtr->closeDataCursorSimple(fileName, lobGlobals); + break; - err = setServerPhandle(); + case Lob_Append: + if ((subOperation == Lob_Memory) ||(subOperation == Lob_Buffer)) + err = lobPtr->append(source, sourceLen, subOperation, descNumIn, retOperLen,lobMaxSize, lobMaxChunkMemSize,lobGCLimit,handleIn,handleInLen,handleOut,handleOutLen,lobGlobals); + else if (subOperation == Lob_File) + err = lobPtr->append(source, -1, subOperation, descNumIn, retOperLen,lobMaxSize, lobMaxChunkMemSize,lobGCLimit,handleIn,handleInLen,handleOut,handleOutLen,lobGlobals); + else + err = LOB_SUBOPER_ERROR; + break; - return err; -} + case Lob_Update: + if ((subOperation == Lob_Memory)||(subOperation == Lob_Buffer)) + err = lobPtr->update(source, sourceLen, subOperation, descNumIn, retOperLen, lobMaxSize, lobMaxChunkMemSize,lobGCLimit,handleIn,handleInLen,handleOut,handleOutLen,lobGlobals); + else if (subOperation == Lob_File) + err = lobPtr->update(source, -1, subOperation,descNumIn, retOperLen,lobMaxSize, lobMaxChunkMemSize,lobGCLimit,handleIn,handleInLen,handleOut,handleOutLen,lobGlobals); + else + err = LOB_SUBOPER_ERROR; + break; -// called once per process -Ex_Lob_Error ExLobGlobals::initialize() -{ - Ex_Lob_Error err = LOB_OPER_OK; + case Lob_Delete: + err = lobPtr->delDesc(handleIn, handleInLen,transId); + break; - lobMap_ = (lobMap_t *) new (getHeap())lobMap_t; // Leaving this allocated from system heap. Since this class contains hdfsFS unable to derive from LOB heap - if (lobMap_ == NULL) - return LOB_INIT_ERROR; + case Lob_Drop: + err = lobPtr->purgeLob(); + it = lobMap->find(string(lobName)); + lobMap->erase(it); + delete lobPtr; + lobPtr = NULL; + break; - - err = setServerPhandle(); - + case Lob_Purge: + err = lobPtr->purgeLob(); + it = lobMap->find(string(lobName)); + lobMap->erase(it); + delete lobPtr; + lobPtr = NULL; + break; - // start the worker threads - startWorkerThreads(); - return err; -} + case Lob_Stats: + err = lobPtr->readStats(source); + lobPtr->initStats(); // because file may remain open across cursors + break; -static void *workerThreadMain(void *arg) -{ - // parameter passed to the thread is an instance of the ExLobHdfs object - ExLobGlobals *glob = (ExLobGlobals *)arg; + case Lob_Empty_Directory: + lobPtr->initialize(fileName, EX_LOB_RW, + dir, storage, hdfsServer, hdfsPort, dir,bufferSize, replication, blockSize); + err = lobPtr->emptyDirectory(); + break; - glob->doWorkInThread(); + case Lob_Cleanup: + delete lobGlobals; + break; + case Lob_PerformGC: + err = lobPtr->compactLobDataFile((ExLobInMemoryDescChunksEntry *)source,sourceLen); + break; + case Lob_RestoreLobDataFile: + err = lobPtr->restoreLobDataFile(); + break; + case Lob_PurgeBackupLobDataFile: + err = lobPtr->purgeBackupLobDataFile(); + break; + default: + err = LOB_OPER_ERROR; + break; + } + /* +//**Note ** This code is needed to reinstate the master transaction after +// returning from the mxlobsrvr process. This is inactive code for now +if (!lobGlobals->isHive() ) + { + if (lobPtr) + // set the pass thru request object values from the lob + lobPtr->getRequest()->getValues(descNumOut, handleOutLen, handleOut, + requestStatus, cliError, + (char *)blackBox, blackBoxLen); // reinstate the transaction + if (TRANSID_IS_VALID(transIdBig)) { + ms_transid_reinstate(transIdBig, transStartId); + } + } - return NULL; + */ + clock_gettime(CLOCK_MONOTONIC, &endTime); + + secs = endTime.tv_sec - startTime.tv_sec; + nsecs = endTime.tv_nsec - startTime.tv_nsec; + if (nsecs < 0) { + secs--; + nsecs += NUM_NSECS_IN_SEC; + } + totalnsecs = (secs * NUM_NSECS_IN_SEC) + nsecs; + if (lobPtr && lobPtr->getStats()) + lobPtr->getStats()->hdfsAccessLayerTime += totalnsecs; + + return err; } -Ex_Lob_Error ExLobGlobals::startWorkerThreads() -{ - int rc; - for (int i=0; i<NUM_WORKER_THREADS; i++) { - rc = pthread_create(&threadId_[i], NULL, workerThreadMain, this); - if (rc != 0) - return LOB_HDFS_THREAD_CREATE_ERROR; - } - - return LOB_OPER_OK; +void cleanupLOBDataDescFiles(const char *lobHdfsServer,int lobHdfsPort,const char *lobHdfsLoc) +{ + int numExistingFiles=0; + hdfsFS fs; + fs = hdfsConnect(lobHdfsServer, lobHdfsPort); + if (fs == NULL) + return; + // Get this list of all data and desc files in the lob sotrage location + hdfsFileInfo *fileInfos = hdfsListDirectory(fs, lobHdfsLoc, &numExistingFiles); + if (fileInfos == NULL) + return ; + //Delete each one in a loop + for (int i = 0; i < numExistingFiles; i++) + hdfsDelete(fs, fileInfos[i].mName, 0); + + // *Note* : delete the memory allocated by libhdfs for the file info array + if (fileInfos) + { + hdfsFreeFileInfo(fileInfos, numExistingFiles); + } } -/////////////////////////////////////////////////////////////////////////////// -// ExLobRequest definitions -/////////////////////////////////////////////////////////////////////////////// -ExLobRequest::ExLobRequest() : - reqNum_(0), - descNumIn_(-1), - descNumOut_(-1), - handleInLen_(-1), - handleOutLen_(-1), - dataOffset_(-1), - type_(Lob_Req_Invalid), - storage_(Lob_Invalid_Storage), - operLen_(-1), - error_(LOB_INVALID_ERROR_VAL), - cliError_(-1), - status_(LOB_INVALID_ERROR_VAL), - transId_(0) -{ - TRANSID_SET_NULL(transIdBig_); -} +// The following methods are used for hive access +/* +Main thread issues an open to open a range of 128 MB and wakes up a +worker thread. It doesnât wait.It calls pre open on the next range. This is +done in method ::readDataCursorSimple. -void ExLobRequest::setValues(char *descFileName, Int64 descNumIn, Int64 handleInLen, - char *handleIn, LobsStorage storage, Int64 transId, - SB_Transid_Type transIdBig, - SB_Transseq_Type transStartId, - char *blackBox, Int64 blackBoxLen) -{ - - descNumIn_ = descNumIn; - handleInLen_ = handleInLen; - storage_ = storage; - strcpy(descFileName_, descFileName); - if (handleIn != NULL && handleInLen > 0) { - memcpy(handleIn_, handleIn, handleInLen); - } - cliError_ = -1; - error_ = LOB_INVALID_ERROR_VAL; - status_ = LOB_INVALID_ERROR_VAL; +The worker threads do their work in ::doWorkInThread and ::performRequests, ::readCursorDataSimple.(note the diff from the method the mainthread calls above) - transId_ = transId; - transIdBig_ = transIdBig; - transStartId_ = transStartId; - blackBoxLen_ = blackBoxLen; - if (blackBox != NULL && blackBoxLen > 0) { - memcpy(blackBox_, blackBox, blackBoxLen); - } - -} +Main thread then issues a read. Since worker thread had already begun fetching +16KB buffers in (1), the main thread most likely will not need to wait and the +data will be ready. It keeps consuming the buffers, recycling them back into +postFetchBufList. +When done, the main thread closes the cursor(::closeDataCursorSimple). This is determined by whether we +have reached the end of range or the end of data for that file. +The worker threads on the other hand read 16KB of data and buffers them in a +prefetchBufList. It continues doing this until end of range is reached or the +buffer limit (128MB) has been reached. +*/ -void ExLobRequest::getValues(Int64 &descNumOut, Int64 &handleOutLen, - char *handleOut, Ex_Lob_Error &requestStatus, - Int64 &cliError, - char *blackBox, Int64 &blackBoxLen) +Ex_Lob_Error ExLob::readDataCursorSimple(char *file, char *tgt, Int64 tgtSize, + Int64 &operLen, ExLobGlobals *lobGlobals) { - - descNumOut = descNumOut_; - handleOutLen = handleOutLen_; - requestStatus = error_; - cliError = cliError_; - if (handleOut != NULL && handleOutLen_ > 0) { - memcpy(handleOut, handleOut_, handleOutLen_); - } - blackBoxLen = blackBoxLen_; - if (blackBox != NULL && blackBoxLen_ > 0) { - memcpy(blackBox, blackBox_, blackBoxLen_); - } - // #endif -} + int dataOffset; + Ex_Lob_Error result = LOB_OPER_OK; + cursor_t *cursor; + ExLobCursor::bufferList_t::iterator c_it; + ExLobCursorBuffer *buf = NULL; + Int64 bytesToCopy = 0; + operLen = 0; + Int64 len; + char *target = tgt; + bool done = false; -ExLobRequest::~ExLobRequest() -{ -} + struct timespec startTime; + struct timespec endTime; -Ex_Lob_Error ExLobRequest::send() -{ - + lobCursorLock_.lock(); - int msgid; - int oid; - MS_Result_Type result; - short req_ctrl[BUFSIZ]; - short rep_ctrl[BUFSIZ]; - char *req_data = (char *)this; - ExLobRequest rep_data; - short req_data_len = sizeof(ExLobRequest); - short rep_data_max = sizeof(ExLobRequest); - int err=0; - int inx=0; - int retries = 3; + lobCursors_it it = lobCursors_.find(string(file, strlen(file))); - incrReqNum(); + if (it == lobCursors_.end()) + { + lobCursorLock_.unlock(); + return LOB_CURSOR_NOT_OPEN; + } + else + { + cursor = &(it->second); + } - status_ = LOB_OPER_REQ_IN_PROGRESS; + lobCursorLock_.unlock(); - do + while ((operLen < tgtSize) && !done && !cursor->eol_) { - err = BMSG_LINK_(&serverPhandle, - &msgid, - req_ctrl, - (ushort) (inx &1), - rep_ctrl, - 1, - req_data, - req_data_len, - (char *)&rep_data, - rep_data_max, - 0,0,0,0); - retries--; - - err = BMSG_BREAK_(msgid, (short *) &result, &serverPhandle); + lobGlobals->traceMessage("locking cursor",cursor,__LINE__); + cursor->lock_.lock(); - if (err == XZFIL_ERR_PATHDOWN) { - //lobGlobals->resetServerPhandle(); - } + // if no buffers to read and is eor or eod, we are done. + // else wait for prefetch thread to wake us up. + if (cursor->prefetchBufList_.size() == 0) { + if (cursor->eor_ || cursor->eod_) { + done = true; + } else { + cursor->bufferMisses_++; + lobGlobals->traceMessage("wait on condition cursor",cursor,__LINE__); + cursor->lock_.wait(); + } + lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__); + cursor->lock_.unlock(); + continue; + } - } while ( (err == XZFIL_ERR_PATHDOWN) && (retries > 0) ); // 201 if lobserver got restared + // a buffer is available + c_it = cursor->prefetchBufList_.begin(); + buf = *c_it; + lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__); + cursor->lock_.unlock(); - status_ = LOB_OPER_REQ_DONE; + bytesToCopy = min(buf->bytesRemaining_, tgtSize - operLen); + memcpy(target, buf->data_ + buf->bytesUsed_, bytesToCopy); + target += bytesToCopy; + if (bytesToCopy == buf->bytesRemaining_) { // buffer is now empty + buf->bytesRemaining_ = -1; + buf->bytesUsed_ = -1; + lobGlobals->postfetchBufListLock_.lock(); + lobGlobals->postfetchBufList_.push_back(buf); + lobGlobals->postfetchBufListLock_.unlock(); + lobGlobals->traceMessage("locking cursor",cursor,__LINE__); + cursor->lock_.lock(); + c_it = cursor->prefetchBufList_.erase(c_it); + lobGlobals->traceMessage("signal condition cursor",cursor,__LINE__); + cursor->lock_.wakeOne(); // wake up prefetch thread if it was waiting for an empty buffer. + lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__); + cursor->lock_.unlock(); + } else { + buf->bytesUsed_ += bytesToCopy; + buf->bytesRemaining_ -= bytesToCopy; + } + stats_.bytesPrefetched += bytesToCopy; + operLen += bytesToCopy; + } - if (err != XZFIL_ERR_OK) - return LOB_SEND_MSG_ERROR; + // update stats + stats_.bytesRead += operLen; + stats_.bytesToRead += tgtSize; + stats_.numReadReqs++; - memcpy(this, &rep_data, rep_data_max); - return LOB_OPER_OK; } -void ExLobRequest::getDescOut(ExLobDesc &desc) -{ - memcpy(&desc, &desc_, sizeof(ExLobDesc)); -} - -void ExLobRequest::putDescIn(ExLobDesc &desc) -{ - memcpy(&desc_, &desc, sizeof(ExLobDesc)); -} - -/////////////////////////////////////////////////////////////////////////////// -// ExLobHdfs definitions -/////////////////////////////////////////////////////////////////////////////// -#ifdef SQ_USE_HDFS - -ExLobLock::ExLobLock() - : bellRang_(false), - waiters_(0) +Ex_Lob_Error ExLob::closeDataCursorSimple(char *fileName, ExLobGlobals *lobGlobals) { - pthread_mutexattr_t mutexAttr; - pthread_mutexattr_init( &mutexAttr ); - pthread_mutex_init( &mutex_, &mutexAttr ); - pthread_cond_init( &workBell_, NULL ); -} + cursor_t *cursor = NULL; + Int64 secs = 0; + Int64 nsecs = 0; -ExLobLock::~ExLobLock() -{ - pthread_mutex_unlock( &mutex_ ); - pthread_mutex_destroy(&mutex_); - pthread_cond_destroy(&workBell_); -} + lobCursorLock_.lock(); -void ExLobLock::lock() -{ - pthread_mutex_lock( &mutex_ ); -} + lobCursors_it it = lobCursors_.find(string(fileName, strlen(fileName))); + if (it != lobCursors_.end()) + { + cursor = &(it->second); + lobGlobals->traceMessage("locking cursor",cursor,__LINE__); + cursor->lock_.lock(); -void ExLobLock::unlock() -{ - pthread_mutex_unlock( &mutex_ ); -} + clock_gettime(CLOCK_MONOTONIC, &cursor->closeTime_); + secs = cursor->closeTime_.tv_sec - cursor->openTime_.tv_sec; + nsecs = cursor->closeTime_.tv_nsec - cursor->openTime_.tv_nsec; -void ExLobLock::wakeOne() -{ - pthread_cond_signal(&workBell_); -} + if (cursor->eod_ || cursor->eor_) { // prefetch thread already done, + cursor->emptyPrefetchList(lobGlobals); + lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__); + cursor->lock_.unlock(); + lobCursors_.erase(it); // so erase it here. + // no need to unlock as cursor object is gone. + } else { + cursor->eol_ = true; // prefetch thread will do the eol rituals + lobGlobals->traceMessage("signal condition cursor",cursor,__LINE__); + cursor->lock_.wakeOne(); // wakeup prefetch thread + lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__); + cursor->lock_.unlock(); + } + } -void ExLobLock::wakeAll() -{ - pthread_cond_broadcast(&workBell_); -} + lobCursorLock_.unlock(); -void ExLobLock::wait() -{ - waiters_++; - pthread_cond_wait(&workBell_, &mutex_); - waiters_--; -} + if (nsecs < 0) { + secs--; + nsecs += NUM_NSECS_IN_SEC; + } + Int64 totalnsecs = (secs * NUM_NSECS_IN_SEC) + nsecs; + stats_.cursorElapsedTime += totalnsecs; -ExLobHdfsRequest::ExLobHdfsRequest(LobsHdfsRequestType reqType, hdfsFS fs, - hdfsFile file, char *buffer, int size) : - reqType_(reqType), - fs_(fs), - file_(file), - buffer_(buffer), - size_(size) -{ - lobPtr_ = 0; - error_ = LOB_OPER_OK; + return LOB_OPER_OK; } -ExLobHdfsRequest::ExLobHdfsRequest(LobsHdfsRequestType reqType, ExLobCursor *cursor) : - reqType_(reqType), - cursor_(cursor) -{ - buffer_=0; - lobPtr_=0; - fs_=0; - file_=0; - size_=0; - error_=LOB_OPER_OK; -} -ExLobHdfsRequest::ExLobHdfsRequest(LobsHdfsRequestType reqType, ExLob *lobPtr, ExLobCursor *cursor) : - reqType_(reqType), - lobPtr_(lobPtr), - cursor_(cursor) +Ex_Lob_Error ExLobGlobals::performRequest(ExLobHdfsRequest *request) { - buffer_=0; - fs_=0; - file_=0; - size_=0; - error_=LOB_OPER_OK; -} + Ex_Lob_Error err = LOB_OPER_OK; + ExLob *lobPtr; + ExLobCursorBuffer *buf; + ExLobCursor *cursor; + Int64 size; + NABoolean seenEOR = false; + NABoolean seenEOD = false; + ExLobCursor::bufferList_t::iterator c_it; + Int64 totalBufSize; -ExLobHdfsRequest::ExLobHdfsRequest(LobsHdfsRequestType reqType) : - reqType_(reqType) -{ + switch (request->reqType_) + { + case Lob_Hdfs_Cursor_Prefetch : + lobPtr = request->lobPtr_; + cursor = request->cursor_; + traceMessage("locking cursor",cursor,__LINE__); + cursor->lock_.lock(); + while (!cursor->eod_ && !cursor->eor_ && !cursor->eol_) + { + postfetchBufListLock_.lock(); + c_it = postfetchBufList_.begin(); + if (c_it != postfetchBufList_.end()) { + buf = *c_it; + postfetchBufList_.erase(c_it); + postfetchBufListLock_.unlock(); + traceMessage("unlocking cursor",cursor,__LINE__); + cursor->lock_.unlock(); + } else { + postfetchBufListLock_.unlock(); + // there are no empty buffers. + // if prefetch list already has the max, wait for one to free up. + totalBufSize = cursor->prefetchBufList_.size() * cursor->bufMaxSize_; + if (totalBufSize > LOB_CURSOR_PREFETCH_BYTES_MAX) { + traceMessage("wait on condition cursor",cursor,__LINE__); + cursor->lock_.wait(); + char buffer2[2048]; + sprintf(buffer2, "cursor->eod_ %d cursor->eor_ %d " + "cursor->eol_ %d", cursor->eod_, + cursor->eor_, cursor->eol_); + traceMessage(buffer2, cursor, __LINE__); + continue; + } + // create a new buffer + traceMessage("unlocking cursor",cursor,__LINE__); + cursor->lock_.unlock(); + buf = new (getHeap()) ExLobCursorBuffer(); + buf->data_ = (char *) (getHeap())->allocateMemory( cursor->bufMaxSize_); + lobPtr->stats_.buffersUsed++; + } + size = min(cursor->bufMaxSize_, (cursor->maxBytes_ - cursor->bytesRead_ + (16 * 1024))); + if (buf->data_) { + lobPtr->readCursorDataSimple(buf->data_, size, *cursor, buf->bytesRemaining_); + buf->bytesUsed_ = 0; + traceMessage("locking cursor",cursor,__LINE__); + cursor->lock_.lock(); + if (size < (cursor->bufMaxSize_)) { + cursor->eor_ = true; + seenEOR = true; + } + if (buf->bytesRemaining_) { + cursor->prefetchBufList_.push_back(buf); + traceMessage("signal condition cursor",cursor,__LINE__); + cursor->lock_.wakeOne(); + traceMessage("unlocking cursor",cursor,__LINE__); + cursor->lock_.unlock(); + } else { + cursor->eod_ = true; + seenEOD = true; + traceMessage("signal condition cursor",cursor,__LINE__); + cursor->lock_.wakeOne(); + traceMessage("unlocking cursor",cursor,__LINE__); + cursor->lock_.unlock(); + postfetchBufListLock_.lock(); + postfetchBufList_.push_back(buf); + postfetchBufListLock_.unlock(); + } + } else { + assert("data_ is null"); + } + // Important! Break and do not access cursor object if we have reached + // end of data or range. + // The main thread could have destroyed the cursor + // in ::closeDataCursorSimple + if (seenEOD || seenEOR) + { + char buffer2[2048]; + sprintf(buffer2, "seenEOD %d seenEOR %d", + seenEOD, seenEOR); + traceMessage(buffer2, cursor, __LINE__); + break; + } + traceMessage("locking cursor",cursor,__LINE__); + cursor->lock_.lock(); + } // while - buffer_=0; - cursor_=0; - lobPtr_=0; - fs_=0; - file_=0; - size_=0; - error_=LOB_OPER_OK; -} + if (!seenEOD && !seenEOR) + { + traceMessage("locking cursor",cursor,__LINE__); + cursor->lock_.unlock(); + if (cursor->eol_) { // never reaches here ?? + lobPtr->deleteCursor(cursor->name_, this); + } + } + processPreOpens(); + break; -ExLobHdfsRequest::~ExLobHdfsRequest() -{ + default: + request->error_ = LOB_HDFS_REQUEST_UNKNOWN; + } + + return LOB_OPER_OK; } -Ex_Lob_Error ExLobGlobals::enqueueRequest(ExLobHdfsRequest *request) + + +Ex_Lob_Error ExLob::readCursorDataSimple(char *tgt, Int64 tgtSize, cursor_t &cursor, Int64 &operLen) { - char buffer2[2048]; - sprintf(buffer2, "enqueue request %d", request->reqType_); - traceMessage(buffer2, NULL, __LINE__); - reqQueueLock_.lock(); - reqQueue_.push_back(request); - reqQueueLock_.wakeOne(); - reqQueueLock_.unlock(); + ExLobDesc desc; + Ex_Lob_Error err; + Int64 bytesAvailable = 0; + Int64 bytesToCopy = 0; + Int64 bytesRead = 0; + operLen = 0; + tOffset offset; + struct timespec startTime; + struct timespec endTime; + bool done = false; - return LOB_OPER_OK; -} + if (!fdData_) { + return LOB_CURSOR_NOT_OPEN_ERROR; + } -Ex_Lob_Error ExLobGlobals::enqueuePrefetchRequest(ExLob *lobPtr, ExLobCursor *cursor) -{// Leaving this allocated from system heap. Since this class contains hdfsFS unable to derive from LOB heap - ExLobHdfsRequest *request = new ExLobHdfsRequest(Lob_Hdfs_Cursor_Prefetch, lobPtr, cursor); - - if (!request) { - // return error + if (cursor.bytesRead_ == -1) { // starting + cursor.bytesRead_ = 0; } - enqueueRequest(request); + clock_gettime(CLOCK_MONOTONIC, &startTime); + + whil
<TRUNCATED>
