[TRAFODION-3110] Refactor LOB access to use the new implementation of HdfsClient
Used a single CQD USE_LIBHDFS to switch to older implementation of using libhdfs for both hdfs scan and LOB access. The CQD USE_LIBHDS_SCAN is retired. In addition fixed the following: 1. Fixed a memory leak of LOB Heap structures 2. Possible fix for memory corruption at times 3. Avoid unnecessary creation of LOB threads when not needed Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/2113439f Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/2113439f Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/2113439f Branch: refs/heads/master Commit: 2113439f3ab46e2d51f933fd66c016d38689437a Parents: 52f074a Author: selvaganesang <[email protected]> Authored: Fri Jun 22 16:13:18 2018 +0000 Committer: selvaganesang <[email protected]> Committed: Fri Jun 22 16:13:18 2018 +0000 ---------------------------------------------------------------------- core/sql/cli/Cli.cpp | 21 ++++---- core/sql/cli/SessionDefaults.cpp | 8 +++ core/sql/cli/SessionDefaults.h | 12 +++++ core/sql/comexe/ComTdb.h | 8 ++- core/sql/common/NAMemory.cpp | 19 ++++++-- core/sql/common/NAMemory.h | 2 + core/sql/executor/HdfsClient_JNI.cpp | 3 +- core/sql/executor/HdfsClient_JNI.h | 2 - core/sql/executor/ex_control.cpp | 7 +++ core/sql/executor/ex_globals.cpp | 10 ++-- core/sql/executor/ex_globals.h | 3 +- core/sql/executor/ex_root.cpp | 2 +- core/sql/executor/ex_split_bottom.cpp | 4 +- core/sql/exp/ExpLOB.cpp | 23 +++++++-- core/sql/exp/ExpLOB.h | 2 + core/sql/exp/ExpLOBaccess.cpp | 51 ++++++++++++-------- core/sql/exp/ExpLOBinterface.cpp | 4 +- core/sql/generator/GenPreCode.cpp | 4 +- core/sql/generator/GenRelEnforcer.cpp | 6 ++- core/sql/generator/GenRelMisc.cpp | 7 +-- core/sql/generator/GenRelScan.cpp | 2 +- core/sql/sqlcomp/DefaultConstants.h | 9 ++-- core/sql/sqlcomp/nadefaults.cpp | 3 +- .../main/java/org/trafodion/sql/HDFSClient.java | 13 +++-- 24 files changed, 153 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/cli/Cli.cpp ---------------------------------------------------------------------- diff --git a/core/sql/cli/Cli.cpp b/core/sql/cli/Cli.cpp index d641f6d..b8b9969 100644 --- a/core/sql/cli/Cli.cpp +++ b/core/sql/cli/Cli.cpp @@ -9466,7 +9466,7 @@ Lng32 SQLCLI_LOBddlInterface ComDiagsArea & diags = currContext.diags(); ComDiagsArea * myDiags = NULL; - + NABoolean useLibHdfs = currContext.getSessionDefaults()->getUseLibHdfs(); char logBuf[4096]; lobDebugInfo("In LOBddlInterface",0,__LINE__,lobTrace); ExeCliInterface *cliInterface = NULL; @@ -9486,6 +9486,7 @@ Lng32 SQLCLI_LOBddlInterface char * query = new(currContext.exHeap()) char[4096]; char *hdfsServer = new(currContext.exHeap()) char[256]; strcpy(hdfsServer,lobHdfsServer); + Int32 rc = 0; switch (qType) { case LOB_CLI_CREATE: @@ -9525,8 +9526,8 @@ Lng32 SQLCLI_LOBddlInterface //Initialize LOB interface - Int32 rc= ExpLOBoper::initLOBglobal(exLobGlob,currContext.exHeap(),&currContext,hdfsServer,hdfsPort); - if (rc) + exLobGlob = ExpLOBoper::initLOBglobal(currContext.exHeap(), &currContext, useLibHdfs); + if (exLobGlob == NULL) { cliRC = 0; ComDiagsArea * da = &diags; @@ -9536,7 +9537,7 @@ Lng32 SQLCLI_LOBddlInterface getLobErrStr(rc), (char*)getSqlJniErrorStr()); goto non_cli_error_return; } - + for (Lng32 i = 0; i < numLOBs; i++) { // create lob data tables @@ -9640,8 +9641,8 @@ Lng32 SQLCLI_LOBddlInterface //above tables . //Initialize LOB interface - Int32 rc= ExpLOBoper::initLOBglobal(exLobGlob,currContext.exHeap(),&currContext,hdfsServer,hdfsPort); - if (rc) + exLobGlob = ExpLOBoper::initLOBglobal(currContext.exHeap(), &currContext, useLibHdfs); + if (exLobGlob == NULL) { cliRC = 0; ComDiagsArea * da = &diags; @@ -9689,9 +9690,8 @@ Lng32 SQLCLI_LOBddlInterface goto error_return; //Initialize LOB interface - - Int32 rc= ExpLOBoper::initLOBglobal(exLobGlob,currContext.exHeap(),&currContext,hdfsServer,hdfsPort); - if (rc) + exLobGlob = ExpLOBoper::initLOBglobal(currContext.exHeap(), &currContext, useLibHdfs); + if (exLobGlob == NULL) { cliRC = 0; ComDiagsArea * da = &diags; @@ -9853,7 +9853,8 @@ Lng32 SQLCLI_LOBddlInterface myDiags->decrRefCount(); } non_cli_error_return: - ExpLOBinterfaceCleanup(exLobGlob); + if (exLobGlob != NULL) + ExpLOBoper::deleteLOBglobal(exLobGlob, currContext.exHeap()); NADELETEBASIC(query, currContext.exHeap()); NADELETEBASIC(hdfsServer,currContext.exHeap()); delete cliInterface; http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/cli/SessionDefaults.cpp ---------------------------------------------------------------------- diff --git a/core/sql/cli/SessionDefaults.cpp b/core/sql/cli/SessionDefaults.cpp index 59be23e..84be26f 100644 --- a/core/sql/cli/SessionDefaults.cpp +++ b/core/sql/cli/SessionDefaults.cpp @@ -108,6 +108,7 @@ static const SessionDefaults::SessionDefaultMap sessionDefaultMap[] = SDEntry(SessionDefaults::SCHEMA, SCHEMA, SessionDefaults::SDT_ASCII, TRUE, TRUE, FALSE, FALSE), SDEntry(SessionDefaults::STATISTICS_VIEW_TYPE, STATISTICS_VIEW_TYPE, SessionDefaults::SDT_ASCII, FALSE, FALSE, TRUE, TRUE), SDEntry(SessionDefaults::SUSPEND_LOGGING, SUSPEND_LOGGING, SessionDefaults::SDT_BOOLEAN, FALSE, FALSE, TRUE, FALSE), + SDEntry(SessionDefaults::USE_LIBHDFS, USE_LIBHDFS, SessionDefaults::SDT_BOOLEAN, TRUE, TRUE, FALSE, FALSE), SDEntry(SessionDefaults::USER_EXPERIENCE_LEVEL, USER_EXPERIENCE_LEVEL, SessionDefaults::SDT_ASCII, TRUE, TRUE, FALSE, FALSE), SDEntry(SessionDefaults::WMS_PROCESS, WMS_PROCESS, SessionDefaults::SDT_BOOLEAN, FALSE, FALSE, TRUE, FALSE) }; @@ -237,6 +238,7 @@ SessionDefaults::SessionDefaults(CollHeap * heap) setCancelEscalationMxosrvrInterval(120); setCancelEscalationSaveabend(FALSE); setModeSeabase(FALSE); + setUseLibHdfs(FALSE); } SessionDefaults::~SessionDefaults() @@ -450,6 +452,12 @@ void SessionDefaults::setSessionDefaultAttributeValue }; break; + case USE_LIBHDFS: + { + setUseLibHdfs(defaultValueAsBoolean); + + } + break; case USER_EXPERIENCE_LEVEL: { setUEL(attrValue, attrValueLen); http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/cli/SessionDefaults.h ---------------------------------------------------------------------- diff --git a/core/sql/cli/SessionDefaults.h b/core/sql/cli/SessionDefaults.h index ea91454..964aa29 100644 --- a/core/sql/cli/SessionDefaults.h +++ b/core/sql/cli/SessionDefaults.h @@ -131,6 +131,7 @@ public: SCHEMA, STATISTICS_VIEW_TYPE, SUSPEND_LOGGING, + USE_LIBHDFS, USER_EXPERIENCE_LEVEL, WMS_PROCESS, LAST_SESSION_DEFAULT_ATTRIBUTE // This enum entry should be last always. Add new enums before this entry @@ -345,6 +346,14 @@ public: espCloseErrorLogging_); } + void setUseLibHdfs(NABoolean useLibHdfs) + { + const Int16 DisAmbiguate = 0; + useLibHdfs_ = useLibHdfs; + updateDefaultsValueString(USE_LIBHDFS, DisAmbiguate, + useLibHdfs_); + } + void setEspFreeMemTimeout(Lng32 espFreeMemTimeout) { espFreeMemTimeout_ = espFreeMemTimeout; @@ -543,6 +552,8 @@ public: NABoolean getSuspendLogging() { return suspendLogging_; } + NABoolean getUseLibHdfs() { return useLibHdfs_; } + Lng32 readFromDefaultsTable(CliGlobals * cliGlobals); Lng32 setIsoMappingDefine(); @@ -790,6 +801,7 @@ private: NABoolean modeSeabase_; Lng32 jniDebugPort_; // port to attache JNI debugger, <=0 to disable Lng32 jniDebugTimeout_; // timeout (msec) to wait for debugger to attach + NABoolean useLibHdfs_; }; http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/comexe/ComTdb.h ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComTdb.h b/core/sql/comexe/ComTdb.h index 1b90b90..374f10c 100644 --- a/core/sql/comexe/ComTdb.h +++ b/core/sql/comexe/ComTdb.h @@ -561,6 +561,10 @@ public: void setProcessLOB(NABoolean v){ v ? flags_ |= PROCESS_LOB: flags_ &= ~PROCESS_LOB;} + NABoolean useLibHdfs() const { return flags_ & USE_LIBHDFS;} + + void setUseLibHdfs(NABoolean v){ v ? flags_ |= USE_LIBHDFS : flags_ &= ~USE_LIBHDFS ;} + enum CollectStatsType { NO_STATS = SQLCLI_NO_STATS, @@ -675,7 +679,9 @@ private: // code generation: // master root(ComTdbRoot), esp root(ComTdbSplitBottom), // eid root (ComTdbEidRoot) - PROCESS_LOB = 0x0100 + PROCESS_LOB = 0x0100, + // + USE_LIBHDFS = 0x0200 }; http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/common/NAMemory.cpp ---------------------------------------------------------------------- diff --git a/core/sql/common/NAMemory.cpp b/core/sql/common/NAMemory.cpp index 0c13caa..d3f0fdd 100644 --- a/core/sql/common/NAMemory.cpp +++ b/core/sql/common/NAMemory.cpp @@ -877,6 +877,8 @@ NAMemory::NAMemory(const char * name) , lastVmSize_(0l) , maxVmSize_(0l) , sharedMemory_(FALSE) + , heapStartAddr_(NULL) + , heapStartOffset_(NULL) { setType(type_, 0); #if ( defined(_DEBUG) || defined(NSK_MEMDEBUG) ) @@ -928,6 +930,8 @@ NAMemory::NAMemory(const char * name, NAHeap * parent, size_t blockSize, , lastVmSize_(0l) , maxVmSize_(0l) , sharedMemory_(FALSE) + , heapStartAddr_(NULL) + , heapStartOffset_(NULL) { if (parent_->getSharedMemory()) setSharedMemory(); @@ -984,6 +988,8 @@ NAMemory::NAMemory(const char * name, NAMemoryType type, size_t blockSize, , lastVmSize_(0l) , maxVmSize_(0l) , sharedMemory_(FALSE) + , heapStartAddr_(NULL) + , heapStartOffset_(NULL) { // call setType to initialize the values of all the sizes setType(type_, blockSize); @@ -1035,6 +1041,8 @@ NAMemory::NAMemory(const char * name, , lastVmSize_(0l) , maxVmSize_(0l) , sharedMemory_(FALSE) + , heapStartOffset_(heapStartOffset) + , heapStartAddr_(baseAddr) { // call setType to initialize the values of all the sizes setType(type_, 0); @@ -1055,12 +1063,12 @@ NAMemory::NAMemory(const char * name, // space in the segment, then initialize the firstBlk_ within // the passed in memory. The NAHeap constructor will initialize // the top NAHeapFragment. - if (baseAddr != NULL) { + if (heapStartAddr_ != NULL) { blockCnt_ = 1; - size_t tsize = maxSize - heapStartOffset - BLOCK_OVERHEAD; + size_t tsize = maxSize - heapStartOffset_ - BLOCK_OVERHEAD; if (tsize > (8 * sizeof(size_t))) { - firstBlk_ = (NABlock*)((char*)baseAddr + heapStartOffset); - firstBlk_->size_ = maxSize - heapStartOffset; + firstBlk_ = (NABlock*)((char*)heapStartAddr_ + heapStartOffset_); + firstBlk_->size_ = maxSize - heapStartOffset_; firstBlk_->sflags_ = NABlock::EXTERN_BIT; firstBlk_->next_ = NULL; firstBlk_->segmentId_ = segmentId; @@ -1161,7 +1169,7 @@ void NAMemory::reInitialize() firstBlk_ = externSegment; firstBlk_->next_ = NULL; blockCnt_ = 1; - totalSize_ = firstBlk_->size_ ; + totalSize_ = firstBlk_->size_ - heapStartOffset_; } // If this is an NAHeap, then call reInitializeHeap() to reinitialize @@ -2668,6 +2676,7 @@ void NAHeap::reInitializeHeap() // That code frees the NABlocks and will reinitialize the firstBlk_ // if it was allocated externally. if (firstBlk_ != NULL) { + assert((char*)firstBlk_ == ((char*)heapStartAddr_ - heapStartOffset_)); least_addr_ = (char*)firstBlk_; initTop(firstBlk_); } http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/common/NAMemory.h ---------------------------------------------------------------------- diff --git a/core/sql/common/NAMemory.h b/core/sql/common/NAMemory.h index 009ec1a..329159f 100644 --- a/core/sql/common/NAMemory.h +++ b/core/sql/common/NAMemory.h @@ -380,6 +380,8 @@ protected: Lng32 lastVmSize_; Lng32 maxVmSize_; DerivedClass derivedClass_; // The derived class (removes virtual functions) + off_t heapStartOffset_; + void *heapStartAddr_; public: // --------------------------------------------------------------------- // The following method and data member are needed for minimizing http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/executor/HdfsClient_JNI.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/HdfsClient_JNI.cpp b/core/sql/executor/HdfsClient_JNI.cpp index 930b7f4..d5b5043 100644 --- a/core/sql/executor/HdfsClient_JNI.cpp +++ b/core/sql/executor/HdfsClient_JNI.cpp @@ -370,6 +370,7 @@ HdfsClient::~HdfsClient() deleteHdfsFileInfo(); if (path_ != NULL) NADELETEBASIC(path_, getHeap()); + path_ = NULL; } void HdfsClient::deleteHdfsFileInfo() @@ -504,7 +505,7 @@ void HdfsClient::setPath(const char *path) { if (path_ != NULL) NADELETEBASIC(path_, getHeap()); - short len = strlen(path); + size_t len = strlen(path); path_ = new (getHeap()) char[len+1]; strcpy(path_, path); } http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/executor/HdfsClient_JNI.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/HdfsClient_JNI.h b/core/sql/executor/HdfsClient_JNI.h index b4ef741..3d12633 100644 --- a/core/sql/executor/HdfsClient_JNI.h +++ b/core/sql/executor/HdfsClient_JNI.h @@ -47,7 +47,6 @@ typedef enum { class HdfsScan : public JavaObjectInterface { public: - // Default constructor - for creating a new JVM HdfsScan(NAHeap *heap) : JavaObjectInterface(heap) , hdfsStats_(NULL) @@ -163,7 +162,6 @@ typedef enum { class HdfsClient : public JavaObjectInterface { public: - // Default constructor - for creating a new JVM HdfsClient(NAHeap *heap) : JavaObjectInterface(heap) , path_(NULL) http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/executor/ex_control.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ex_control.cpp b/core/sql/executor/ex_control.cpp index 4a23e30..d2792f7 100644 --- a/core/sql/executor/ex_control.cpp +++ b/core/sql/executor/ex_control.cpp @@ -387,6 +387,13 @@ short ExControlTcb::work() currContext->getSessionDefaults()-> setSchema(value[2], strlen(value[2])); } + else if (strcmp(value[1], "USE_LIBHDFS") == 0) + { + if (strcmp(value[2], "ON") == 0) + currContext->getSessionDefaults()->setUseLibHdfs(TRUE); + else + currContext->getSessionDefaults()->setUseLibHdfs(FALSE); + } else if (strcmp(value[1], "USER_EXPERIENCE_LEVEL") == 0) { currContext->getSessionDefaults()-> http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/executor/ex_globals.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ex_globals.cpp b/core/sql/executor/ex_globals.cpp index df2ca67..673690f 100644 --- a/core/sql/executor/ex_globals.cpp +++ b/core/sql/executor/ex_globals.cpp @@ -89,11 +89,9 @@ ExLobGlobals *&ex_globals::getExLobGlobal() return exLobGlobals_; } -void ex_globals::initLOBglobal(ContextCli *context) +void ex_globals::initLOBglobal(ContextCli *context, NABoolean useLibHdfs) { - // initialize lob interface - ExpLOBoper::initLOBglobal(exLobGlobals_, (NAHeap *)heap_, context, (char *)"default", (Int32)0); - + exLobGlobals_ = ExpLOBoper::initLOBglobal((NAHeap *)heap_, context, useLibHdfs); } void ex_globals::reAllocate(short create_gui_sched) @@ -135,9 +133,9 @@ void ex_globals::deleteMe(NABoolean fatalError) statsArea_ = NULL; cleanupTcbs(); tcbList_.deallocate(); - NADELETE(exLobGlobals_, ExLobGlobals, exLobGlobals_->getHeap()); + if (exLobGlobals_ != NULL) + ExpLOBoper::deleteLOBglobal(exLobGlobals_, (NAHeap *)heap_); exLobGlobals_ = NULL; - } void ex_globals::deleteMemory(void *mem) http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/executor/ex_globals.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/ex_globals.h b/core/sql/executor/ex_globals.h index 536c76f..f6b3a94 100644 --- a/core/sql/executor/ex_globals.h +++ b/core/sql/executor/ex_globals.h @@ -183,7 +183,7 @@ public: ExLobGlobals *&getExLobGlobal(); - void initLOBglobal(ContextCli *context); + void initLOBglobal(ContextCli *context, NABoolean useLibHdfs); SequenceValueGenerator * seqGen(); @@ -191,6 +191,7 @@ public: void setRollupColumnNum(Int16 v) { rollupColumnNum_ = v; } Int16 getRollupColumnNum() { return rollupColumnNum_; } + ExLobGlobals *getLobGlobals() {return exLobGlobals_; } private: enum FlagsTypeEnum http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/executor/ex_root.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ex_root.cpp b/core/sql/executor/ex_root.cpp index 0fe945b..1c8f8fc 100644 --- a/core/sql/executor/ex_root.cpp +++ b/core/sql/executor/ex_root.cpp @@ -299,7 +299,7 @@ ex_tcb * ex_root_tdb::build(CliGlobals *cliGlobals, ex_globals * glob) if (processLOB()) { - glob->initLOBglobal(cliGlobals->currContext()); + glob->initLOBglobal(cliGlobals->currContext(), useLibHdfs()); } return (root_tcb); http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/executor/ex_split_bottom.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ex_split_bottom.cpp b/core/sql/executor/ex_split_bottom.cpp index 77324f3..4e1084a 100644 --- a/core/sql/executor/ex_split_bottom.cpp +++ b/core/sql/executor/ex_split_bottom.cpp @@ -50,7 +50,7 @@ #include "str.h" #include "exp_clause_derived.h" #include "ExSMGlobals.h" - +#include "ExpLOBaccess.h" // ----------------------------------------------------------------------- // Methods for class ex_split_bottom_tdb @@ -163,7 +163,7 @@ ex_split_bottom_tcb * ex_split_bottom_tdb::buildESPTcbTree( if (processLOB()) { - glob->initLOBglobal(glob->getCliGlobals()->currContext()); + glob->initLOBglobal(glob->getCliGlobals()->currContext(), useLibHdfs()); } return result; http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/exp/ExpLOB.cpp ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOB.cpp b/core/sql/exp/ExpLOB.cpp index fe09b43..de5b692 100644 --- a/core/sql/exp/ExpLOB.cpp +++ b/core/sql/exp/ExpLOB.cpp @@ -55,15 +55,32 @@ #include "ex_globals.h" #include "ex_god.h" +ExLobGlobals *ExpLOBoper::initLOBglobal(NAHeap *parentHeap, ContextCli *currContext, NABoolean useLibHdfs) +{ + NAHeap *lobHeap = new (parentHeap) NAHeap("LOB Heap", parentHeap); + ExLobGlobals *exLobGlobals = new (lobHeap) ExLobGlobals(lobHeap); + exLobGlobals->setUseLibHdfs(useLibHdfs); + exLobGlobals->initialize(); + // initialize lob interface + ExpLOBoper::initLOBglobal(exLobGlobals, lobHeap, currContext, (char *)"default", (Int32)0); + return exLobGlobals; +} -Lng32 ExpLOBoper::initLOBglobal(ExLobGlobals *& exLobGlobals, NAHeap *parentHeap, ContextCli *currContext, char *hdfsServer ,Int32 port) + +Lng32 ExpLOBoper::initLOBglobal(ExLobGlobals *& exLobGlobals, NAHeap *heap, ContextCli *currContext, char *hdfsServer ,Int32 port) { // call ExeLOBinterface to initialize lob globals - ExpLOBinterfaceInit(exLobGlobals, parentHeap,currContext,FALSE, hdfsServer, port); - + ExpLOBinterfaceInit(exLobGlobals, heap,currContext,FALSE, hdfsServer, port); return 0; } +void ExpLOBoper::deleteLOBglobal(ExLobGlobals *exLobGlobals, NAHeap *heap) +{ + NAHeap *lobHeap = exLobGlobals->getHeap(); + NADELETE(exLobGlobals, ExLobGlobals, lobHeap); + NADELETE(lobHeap, NAHeap, heap); +} + char * ExpLOBoper::ExpGetLOBname(Int64 uid, Lng32 num, char * outBuf, Lng32 outBufLen) { if (outBufLen < 31) http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/exp/ExpLOB.h ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOB.h b/core/sql/exp/ExpLOB.h index 231fa20..7ee8439 100644 --- a/core/sql/exp/ExpLOB.h +++ b/core/sql/exp/ExpLOB.h @@ -137,6 +137,8 @@ public: Int64 uid, Lng32 lobNum); static Lng32 initLOBglobal(ExLobGlobals *& lobGlob, NAHeap *heap, ContextCli *currContext,char *server, Int32 port ); + static ExLobGlobals *initLOBglobal(NAHeap *parentHeap, ContextCli *currContext, NABoolean useLibHdfs); + static void deleteLOBglobal(ExLobGlobals *lobGlob, NAHeap *parentHeap); static void genLobLockId(Int64 objUid,Int32 lobNum, char *llid); // Extracts values from the LOB handle stored at ptr http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/exp/ExpLOBaccess.cpp ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOBaccess.cpp b/core/sql/exp/ExpLOBaccess.cpp index 3b4cd93..28562f2 100644 --- a/core/sql/exp/ExpLOBaccess.cpp +++ b/core/sql/exp/ExpLOBaccess.cpp @@ -189,11 +189,9 @@ Ex_Lob_Error ExLob::initialize(const char *lobFile, Ex_Lob_Mode mode, if (hdfsClientRetcode != HDFS_CLIENT_OK) return LOB_DATA_FILE_CREATE_ERROR; } - else { - hdfsClientRetcode = hdfsClient_->hdfsOpen(lobDataFile_.data(), FALSE); - if (hdfsClientRetcode != HDFS_CLIENT_OK) - return LOB_DATA_FILE_OPEN_ERROR; - } + hdfsClientRetcode = hdfsClient_->hdfsOpen(lobDataFile_.data(), FALSE); + if (hdfsClientRetcode != HDFS_CLIENT_OK) + return LOB_DATA_FILE_OPEN_ERROR; fdData_ = NULL; } else @@ -215,7 +213,6 @@ Ex_Lob_Error ExLob::initialize(const char *lobFile, Ex_Lob_Mode mode, } hdfsCloseFile(fs_, fdData_); fdData_ = NULL; - } } return LOB_OPER_OK; @@ -697,11 +694,15 @@ Ex_Lob_Error ExLob::readHdfsSourceFile(char *srcfile, char *&fileData, Int32 &si HdfsClient *srcHdfsClient = HdfsClient::newInstance(getLobGlobalHeap(), NULL, hdfsClientRetcode); ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: HdfsClient::newInstance returned an error"); hdfsClientRetcode = srcHdfsClient->hdfsOpen(srcfile, FALSE); - if (hdfsClientRetcode != HDFS_CLIENT_OK) + if (hdfsClientRetcode != HDFS_CLIENT_OK) { + HdfsClient::deleteInstance(srcHdfsClient); return LOB_SOURCE_FILE_OPEN_ERROR; + } fileData = (char *) (getLobGlobalHeap())->allocateMemory(size); - if (fileData == (char *)-1) + if (fileData == (char *)-1) { + HdfsClient::deleteInstance(srcHdfsClient); return LOB_SOURCE_DATA_ALLOC_ERROR; + } bytesRead = srcHdfsClient->hdfsRead(offset, fileData, size, hdfsClientRetcode); if (hdfsClientRetcode != HDFS_CLIENT_OK) { HdfsClient::deleteInstance(srcHdfsClient); @@ -2273,10 +2274,28 @@ Ex_Lob_Error ExLob::readDataToMem(char *memAddr, if (! useLibHdfs_) { HDFS_Client_RetCode hdfsClientRetcode; Int32 readLen; - readLen = hdfsClient_->hdfsRead(offset, memAddr, size, hdfsClientRetcode); - if (hdfsClientRetcode != HDFS_CLIENT_OK) - return LOB_DATA_READ_ERROR; - operLen = readLen; + if (storage_ == Lob_External_HDFS_File) { + HdfsClient *srcHdfsClient = HdfsClient::newInstance(getLobGlobalHeap(), NULL, hdfsClientRetcode); + ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: HdfsClient::newInstance returned an error"); + hdfsClientRetcode = srcHdfsClient->hdfsOpen(lobDataFile_.data(), FALSE); + if (hdfsClientRetcode != HDFS_CLIENT_OK) { + HdfsClient::deleteInstance(srcHdfsClient); + return LOB_SOURCE_FILE_OPEN_ERROR; + } + readLen = srcHdfsClient->hdfsRead(offset, memAddr, size, hdfsClientRetcode); + if (hdfsClientRetcode != HDFS_CLIENT_OK) { + HdfsClient::deleteInstance(srcHdfsClient); + return LOB_SOURCE_FILE_READ_ERROR; + } + HdfsClient::deleteInstance(srcHdfsClient); + operLen = readLen; + } + else { + readLen = hdfsClient_->hdfsRead(offset, memAddr, size, hdfsClientRetcode); + if (hdfsClientRetcode != HDFS_CLIENT_OK) + return LOB_DATA_READ_ERROR; + operLen = readLen; + } return LOB_OPER_OK; } lobDebugInfo("Reading in single chunk",0,__LINE__,lobTrace_); @@ -3498,12 +3517,6 @@ ExLobGlobals::ExLobGlobals(NAHeap *lobHeap) : } if(getenv("TRACE_LOB_ACTIONS")) lobTrace_ = TRUE; - char *useLibHdfsStr = getenv("USE_LIBHDFS"); - int useLibHdfs = 0; - if (useLibHdfsStr != NULL) - useLibHdfs = atoi(useLibHdfsStr); - if (useLibHdfs != 0) - useLibHdfs_ = TRUE; } ExLobGlobals::~ExLobGlobals() @@ -3598,8 +3611,6 @@ ExLobGlobals::~ExLobGlobals() if (threadTraceFile_) fclose(threadTraceFile_); threadTraceFile_ = NULL; - - } http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/exp/ExpLOBinterface.cpp ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOBinterface.cpp b/core/sql/exp/ExpLOBinterface.cpp index a90a0cb..93035ed 100644 --- a/core/sql/exp/ExpLOBinterface.cpp +++ b/core/sql/exp/ExpLOBinterface.cpp @@ -34,7 +34,7 @@ using std::ofstream; #include "ExpLOBinterface.h" #include "ex_globals.h" -Lng32 ExpLOBinterfaceInit(ExLobGlobals *& exLobGlob, NAHeap * parentHeap, +Lng32 ExpLOBinterfaceInit(ExLobGlobals *& exLobGlob, NAHeap *lobHeap, ContextCli *currContext,NABoolean isHiveRead, char *hdfsServer, Int32 port) @@ -46,8 +46,6 @@ Lng32 ExpLOBinterfaceInit(ExLobGlobals *& exLobGlob, NAHeap * parentHeap, Ex_Lob_Error status; Int32 dummyParam2 = 0; - NAHeap *lobHeap = new ((NAHeap *)parentHeap) NAHeap("LOB Heap", (NAHeap *)parentHeap); - err = ExLobsOper((char*)"dummy", NULL, 0, NULL, 0, http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/generator/GenPreCode.cpp ---------------------------------------------------------------------- diff --git a/core/sql/generator/GenPreCode.cpp b/core/sql/generator/GenPreCode.cpp index 77902f9..f8de0f7 100644 --- a/core/sql/generator/GenPreCode.cpp +++ b/core/sql/generator/GenPreCode.cpp @@ -4200,7 +4200,8 @@ RelExpr * FileScan::preCodeGen(Generator * generator, { // assign individual files and blocks to each ESPs ((NodeMap *) getPartFunc()->getNodeMap())->assignScanInfos(hiveSearchKey_); - generator->setProcessLOB(TRUE); + if (CmpCommon::getDefault(USE_LIBHDFS) == DF_ON) + generator->setProcessLOB(TRUE); // flag set for HBase scan in HbaseAccess::preCodeGen // unique scan unlikely for hive scans except @@ -5477,7 +5478,6 @@ RelExpr * HiveInsert::preCodeGen(Generator * generator, return this; generator->setHiveAccess(TRUE); - generator->setProcessLOB(TRUE); return GenericUpdate::preCodeGen(generator, externalInputs, pulledNewInputs); } http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/generator/GenRelEnforcer.cpp ---------------------------------------------------------------------- diff --git a/core/sql/generator/GenRelEnforcer.cpp b/core/sql/generator/GenRelEnforcer.cpp index bb557a1..6641833 100644 --- a/core/sql/generator/GenRelEnforcer.cpp +++ b/core/sql/generator/GenRelEnforcer.cpp @@ -907,8 +907,10 @@ short Exchange::codeGenForESP(Generator * generator) splitBottom->setCIFON( (tupleFormat == ExpTupleDesc::SQLMX_ALIGNED_FORMAT)); - if (generator->processLOB()) - splitBottom->setProcessLOB(TRUE); + if (generator->processLOB()) { + splitBottom->setProcessLOB(TRUE); + splitBottom->setUseLibHdfs(CmpCommon::getDefault(USE_LIBHDFS) == DF_ON); + } if (CmpCommon::getDefault(COMP_BOOL_153) == DF_ON) splitBottom->setForceSkewRoundRobin(TRUE); http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/generator/GenRelMisc.cpp ---------------------------------------------------------------------- diff --git a/core/sql/generator/GenRelMisc.cpp b/core/sql/generator/GenRelMisc.cpp index 737bf4e..a40c90c 100644 --- a/core/sql/generator/GenRelMisc.cpp +++ b/core/sql/generator/GenRelMisc.cpp @@ -2682,9 +2682,10 @@ short RelRoot::codeGen(Generator * generator) } - if (generator->processLOB()) - root_tdb->setProcessLOB(TRUE); - + if (generator->processLOB()) { + root_tdb->setProcessLOB(TRUE); + root_tdb->setUseLibHdfs(CmpCommon::getDefault(USE_LIBHDFS) == DF_ON); + } // Self-referencing updates if (avoidHalloween_) http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/generator/GenRelScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/generator/GenRelScan.cpp b/core/sql/generator/GenRelScan.cpp index 608ecb0..1298c09 100644 --- a/core/sql/generator/GenRelScan.cpp +++ b/core/sql/generator/GenRelScan.cpp @@ -1399,7 +1399,7 @@ if (hTabStats->isOrcFile()) hdfsscan_tdb->setUseCif(useCIF); hdfsscan_tdb->setUseCifDefrag(useCIFDegrag); - if (CmpCommon::getDefault(USE_LIBHDFS_SCAN) == DF_ON) + if (CmpCommon::getDefault(USE_LIBHDFS) == DF_ON) hdfsscan_tdb->setUseLibhdfsScan(TRUE); hdfsscan_tdb->setCompressedFile(isCompressedFile); http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/sqlcomp/DefaultConstants.h ---------------------------------------------------------------------- diff --git a/core/sql/sqlcomp/DefaultConstants.h b/core/sql/sqlcomp/DefaultConstants.h index 1488f77..d2aed8d 100644 --- a/core/sql/sqlcomp/DefaultConstants.h +++ b/core/sql/sqlcomp/DefaultConstants.h @@ -3315,8 +3315,8 @@ enum DefaultConstants BMO_MEMORY_ESTIMATE_OUTLIER_FACTOR, - // Use the earlier implementation of HdfsScan via libhdfs - USE_LIBHDFS_SCAN, + // Use the earlier implementation of Hdfs access including LOB via libhdfs + USE_LIBHDFS, // if set, make primary key columns non-nullable. ANSI specification. // Default is ON. @@ -3337,13 +3337,14 @@ enum DefaultConstants // if set, ddl from Traf interface on Hive objects is supported. TRAF_DDL_ON_HIVE_OBJECTS, - // This enum constant must be the LAST one in the list; it's a count, - // not an Attribute (it's not IN DefaultDefaults; it's the SIZE of it)! // Size of byte[] in java when direct byteBuffer can't be used // Used to read compressed hdfs text files and to write // both compressed and uncompressed hdfs files HDFS_IO_INTERIM_BYTEARRAY_SIZE_IN_KB, + // This enum constant must be the LAST one in the list; it's a count, + // not an Attribute (it's not IN DefaultDefaults; it's the SIZE of it)! + __NUM_DEFAULT_ATTRIBUTES }; http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/sqlcomp/nadefaults.cpp ---------------------------------------------------------------------- diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp index 7650726..e3c1a68 100644 --- a/core/sql/sqlcomp/nadefaults.cpp +++ b/core/sql/sqlcomp/nadefaults.cpp @@ -3055,7 +3055,7 @@ XDDkwd__(SUBQUERY_UNNESTING, "ON"), // Use large queues on RHS of Flow/Nested Join when appropriate DDkwd__(USE_LARGE_QUEUES, "ON"), - DDkwd__(USE_LIBHDFS_SCAN, "OFF"), + XDDkwd__(USE_LIBHDFS, "OFF"), DDkwd__(USE_MAINTAIN_CONTROL_TABLE, "OFF"), @@ -6308,6 +6308,7 @@ DefaultToken NADefaults::token(Int32 attrEnum, break; case AUTO_QUERY_RETRY_WARNINGS: + case USE_LIBHDFS: if (tok == DF_ON || tok == DF_OFF) isValid = TRUE; http://git-wip-us.apache.org/repos/asf/trafodion/blob/2113439f/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java index 7d1b43b..95316d5 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java +++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java @@ -303,11 +303,16 @@ public class HDFSClient fs_ = FileSystem.get(filepath_.toUri(),config_); compressed_ = compress; fsdis_ = null; + if (fs_.exists(filepath_)) + { + if (overwrite) + fs_.delete(filepath_); + else + throw new IOException(filepath_ + " already exists"); + } FSDataOutputStream fsOut = null; - if (overwrite) - fsOut = fs_.create(filepath_); - if (fsOut != null) - fsOut.close(); + fsOut = fs_.create(filepath_); + fsOut.close(); return true; }
