[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text formatted hive tables
Part-2 changes Introduced a new CQD USE_LIBHDFS_SCAN 'OFF' to switch to the new implementation The new implementation details are at executor/ExHdfsScan.h Fixed a bug that was causing unexpected errors in JVM when the JNI object corresponding to java class is not initialized correctly Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/f17e15ee Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/f17e15ee Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/f17e15ee Branch: refs/heads/master Commit: f17e15eed741a40a41eec6a1a206dd661589623c Parents: 60db153 Author: selvaganesang <[email protected]> Authored: Wed Jan 31 19:48:49 2018 +0000 Committer: selvaganesang <[email protected]> Committed: Wed Jan 31 19:48:49 2018 +0000 ---------------------------------------------------------------------- core/sql/comexe/ComTdbHdfsScan.h | 10 +- core/sql/executor/ExExeUtilGet.cpp | 16 +- core/sql/executor/ExExeUtilLoad.cpp | 10 +- core/sql/executor/ExFastTransport.cpp | 2 - core/sql/executor/ExFastTransport.h | 2 +- core/sql/executor/ExHbaseAccess.cpp | 10 +- core/sql/executor/ExHbaseIUD.cpp | 5 - core/sql/executor/ExHdfsScan.cpp | 185 ++++++++++++++--- core/sql/executor/ExHdfsScan.h | 56 ++++++ core/sql/executor/HBaseClient_JNI.cpp | 21 +- core/sql/executor/HBaseClient_JNI.h | 12 +- core/sql/executor/HdfsClient_JNI.cpp | 198 +++++++++++++++---- core/sql/executor/HdfsClient_JNI.h | 28 +-- core/sql/executor/JavaObjectInterface.cpp | 14 +- core/sql/executor/JavaObjectInterface.h | 24 +-- core/sql/executor/OrcFileReader.cpp | 3 +- core/sql/executor/SequenceFileReader.cpp | 6 +- core/sql/exp/ExpErrorEnums.h | 1 + core/sql/exp/ExpHbaseInterface.cpp | 20 +- core/sql/exp/ExpHbaseInterface.h | 12 +- core/sql/exp/ExpLOBinterface.h | 13 -- core/sql/generator/GenRelScan.cpp | 3 + core/sql/qmscommon/QRLogger.cpp | 1 + core/sql/qmscommon/QRLogger.h | 1 + core/sql/sqlcomp/DefaultConstants.h | 3 + core/sql/sqlcomp/nadefaults.cpp | 2 + .../main/java/org/trafodion/sql/HDFSClient.java | 22 ++- .../main/java/org/trafodion/sql/HdfsScan.java | 35 +++- 28 files changed, 511 insertions(+), 204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/comexe/ComTdbHdfsScan.h ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComTdbHdfsScan.h b/core/sql/comexe/ComTdbHdfsScan.h index 1d65bca..ff692c9 100755 --- a/core/sql/comexe/ComTdbHdfsScan.h +++ b/core/sql/comexe/ComTdbHdfsScan.h @@ -24,7 +24,7 @@ #define COM_HDFS_SCAN_H #include "ComTdb.h" -//#include "hdfs.h" // tPort +//#include "hdfs.h" #include "ExpLOBinterface.h" #include "ComQueue.h" @@ -54,7 +54,8 @@ class ComTdbHdfsScan : public ComTdb // ignore conversion errors and continue reading the next row. CONTINUE_ON_ERROR = 0x0020, LOG_ERROR_ROWS = 0x0040, - ASSIGN_RANGES_AT_RUNTIME = 0x0080 + ASSIGN_RANGES_AT_RUNTIME = 0x0080, + USE_LIBHDFS_SCAN = 0x0100 }; // Expression to filter rows. @@ -284,6 +285,11 @@ public: {(v ? flags_ |= ASSIGN_RANGES_AT_RUNTIME : flags_ &= ~ASSIGN_RANGES_AT_RUNTIME); } NABoolean getAssignRangesAtRuntime() const { return (flags_ & ASSIGN_RANGES_AT_RUNTIME) != 0; } + + void setUseLibhdfsScan(NABoolean v) + {(v ? flags_ |= USE_LIBHDFS_SCAN : flags_ &= ~USE_LIBHDFS_SCAN); } + NABoolean getUseLibhdfsScan() const + { return (flags_ & USE_LIBHDFS_SCAN) != 0; } UInt32 getMaxErrorRows() const { return maxErrorRows_;} void setMaxErrorRows(UInt32 v ) { maxErrorRows_= v; } http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/ExExeUtilGet.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExExeUtilGet.cpp b/core/sql/executor/ExExeUtilGet.cpp index 6d675cb..539a8cf 100644 --- a/core/sql/executor/ExExeUtilGet.cpp +++ b/core/sql/executor/ExExeUtilGet.cpp @@ -3521,13 +3521,9 @@ ExExeUtilGetHbaseObjectsTcb::ExExeUtilGetHbaseObjectsTcb( ex_globals * glob) : ExExeUtilGetMetadataInfoTcb( exe_util_tdb, glob) { - int jniDebugPort = 0; - int jniDebugTimeout = 0; ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(), (char*)exe_util_tdb.server(), - (char*)exe_util_tdb.zkPort(), - jniDebugPort, - jniDebugTimeout); + (char*)exe_util_tdb.zkPort()); hbaseName_ = NULL; hbaseNameBuf_ = new(getGlobals()->getDefaultHeap()) @@ -6106,9 +6102,7 @@ ExExeUtilRegionStatsTcb::ExExeUtilRegionStatsTcb( int jniDebugTimeout = 0; ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(), (char*)"", //exe_util_tdb.server(), - (char*)"", //exe_util_tdb.zkPort(), - jniDebugPort, - jniDebugTimeout); + (char*)""); //exe_util_tdb.zkPort(), regionInfoList_ = NULL; @@ -6879,13 +6873,9 @@ ExExeUtilClusterStatsTcb::ExExeUtilClusterStatsTcb( stats_ = (ComTdbClusterStatsVirtTableColumnStruct*)statsBuf_; - int jniDebugPort = 0; - int jniDebugTimeout = 0; ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(), (char*)"", //exe_util_tdb.server(), - (char*)"", //exe_util_tdb.zkPort(), - jniDebugPort, - jniDebugTimeout); + (char*)""); //exe_util_tdb.zkPort()); regionInfoList_ = NULL; http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/ExExeUtilLoad.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExExeUtilLoad.cpp b/core/sql/executor/ExExeUtilLoad.cpp index 819b3b1..0ebc65c 100644 --- a/core/sql/executor/ExExeUtilLoad.cpp +++ b/core/sql/executor/ExExeUtilLoad.cpp @@ -1245,9 +1245,7 @@ short ExExeUtilHBaseBulkLoadTcb::work() int jniDebugTimeout = 0; ehi_ = ExpHbaseInterface::newInstance(getGlobals()->getDefaultHeap(), (char*)"", //Later may need to change to hblTdb.server_, - (char*)"", //Later may need to change to hblTdb.zkPort_, - jniDebugPort, - jniDebugTimeout); + (char*)""); //Later may need to change to hblTdb.zkPort_); retcode = ehi_->initHBLC(); if (retcode == 0) retcode = ehi_->createCounterTable(hblTdb().getErrCountTable(), (char *)"ERRORS"); @@ -1983,13 +1981,9 @@ ExExeUtilHBaseBulkUnLoadTcb::ExExeUtilHBaseBulkUnLoadTcb( oneFile_(FALSE) { hdfsClient_ = NULL; - int jniDebugPort = 0; - int jniDebugTimeout = 0; ehi_ = ExpHbaseInterface::newInstance(getGlobals()->getDefaultHeap(), (char*)"", //Later may need to change to hblTdb.server_, - (char*)"", //Later may need to change to hblTdb.zkPort_, - jniDebugPort, - jniDebugTimeout); + (char*)""); //Later may need to change to hblTdb.zkPort_); qparent_.down->allocatePstate(this); } http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/ExFastTransport.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExFastTransport.cpp b/core/sql/executor/ExFastTransport.cpp index bdde201..3a26467 100644 --- a/core/sql/executor/ExFastTransport.cpp +++ b/core/sql/executor/ExFastTransport.cpp @@ -1291,8 +1291,6 @@ void ExHdfsFastExtractTcb::createHdfsClientFileError(Int32 hdfsClientRetCode) NULL, NULL, NULL, NULL, errorMsg, (char *)currContext->getJniErrorStr().data()); - //ex_queue_entry *pentry_down = qParent_.down->getHeadEntry(); - //pentry_down->setDiagsArea(diagsArea); updateWorkATPDiagsArea(diagsArea); } http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/ExFastTransport.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExFastTransport.h b/core/sql/executor/ExFastTransport.h index 94b091d..5bf1219 100644 --- a/core/sql/executor/ExFastTransport.h +++ b/core/sql/executor/ExFastTransport.h @@ -408,7 +408,7 @@ protected: NABoolean isSequenceFile(); void createSequenceFileError(Int32 sfwRetCode); - void createHdfsClientFileError(Int32 sfwRetCode); + void createHdfsClientFileError(Int32 hdfsClientRetCode); NABoolean isHdfsCompressed(); NABoolean getEmptyNullString() { http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/ExHbaseAccess.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHbaseAccess.cpp b/core/sql/executor/ExHbaseAccess.cpp index 42fd86e..2247b9a 100644 --- a/core/sql/executor/ExHbaseAccess.cpp +++ b/core/sql/executor/ExHbaseAccess.cpp @@ -358,15 +358,9 @@ ExHbaseAccessTcb::ExHbaseAccessTcb( registerSubtasks(); registerResizeSubtasks(); - int jniDebugPort = 0; - int jniDebugTimeout = 0; ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(), - // (char*)"localhost", (char*)hbaseAccessTdb.server_, - // (char*)"2181", - (char*)hbaseAccessTdb.zkPort_, - jniDebugPort, - jniDebugTimeout); + (char*)hbaseAccessTdb.zkPort_); asciiRow_ = NULL; asciiRowMissingCols_ = NULL; @@ -508,6 +502,8 @@ void ExHbaseAccessTcb::freeResources() NADELETEBASIC(colVal_.val, getHeap()); if (hdfsClient_ != NULL) NADELETE(hdfsClient_, HdfsClient, getHeap()); + if (loggingFileName_ != NULL) + NADELETEBASIC(loggingFileName_, getHeap()); } http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/ExHbaseIUD.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHbaseIUD.cpp b/core/sql/executor/ExHbaseIUD.cpp index 3bc1d93..e8896b2 100644 --- a/core/sql/executor/ExHbaseIUD.cpp +++ b/core/sql/executor/ExHbaseIUD.cpp @@ -1158,16 +1158,11 @@ ExHbaseAccessBulkLoadPrepSQTcb::ExHbaseAccessBulkLoadPrepSQTcb( "traf_upsert_err", fileNum, loggingFileName_); - loggingFileCreated_ = FALSE; loggingRow_ = new(glob->getDefaultHeap()) char[hbaseAccessTdb.updateRowLen_]; } ExHbaseAccessBulkLoadPrepSQTcb::~ExHbaseAccessBulkLoadPrepSQTcb() { - if (loggingFileName_ != NULL) { - NADELETEBASIC(loggingFileName_, getHeap()); - loggingFileName_ = NULL; - } // Flush and close sample file if used if (hdfs_) { http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/ExHdfsScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp index 90ac737..e29baf6 100644 --- a/core/sql/executor/ExHdfsScan.cpp +++ b/core/sql/executor/ExHdfsScan.cpp @@ -120,15 +120,35 @@ ExHdfsScanTcb::ExHdfsScanTcb( , loggingErrorDiags_(NULL) , loggingFileName_(NULL) , hdfsClient_(NULL) + , hdfsScan_(NULL) , hdfsFileInfoListAsArray_(glob->getDefaultHeap(), hdfsScanTdb.getHdfsFileInfoList()->numEntries()) { Space * space = (glob ? glob->getSpace() : 0); CollHeap * heap = (glob ? glob->getDefaultHeap() : 0); + useLibhdfsScan_ = hdfsScanTdb.getUseLibhdfsScan(); lobGlob_ = NULL; - const int readBufSize = (Int32)hdfsScanTdb.hdfsBufSize_; - hdfsScanBuffer_ = new(space) char[ readBufSize + 1 ]; - hdfsScanBuffer_[readBufSize] = '\0'; - + hdfsScanBufMaxSize_ = hdfsScanTdb.hdfsBufSize_; + headRoom_ = (Int32)hdfsScanTdb.rangeTailIOSize_; + + if (useLibhdfsScan_) { + hdfsScanBuffer_ = new(heap) char[ hdfsScanBufMaxSize_ + 1 ]; + hdfsScanBuffer_[hdfsScanBufMaxSize_] = '\0'; + } else { + hdfsScanBufBacking_[0] = new (heap) BYTE[hdfsScanBufMaxSize_ + 2 * (headRoom_)]; + hdfsScanBufBacking_[1] = new (heap) BYTE[hdfsScanBufMaxSize_ + 2 * (headRoom_)]; + for (int i=0; i < 2; i++) { + BYTE *hdfsScanBufBacking = hdfsScanBufBacking_[i]; + hdfsScanBuf_[i].headRoom_ = hdfsScanBufBacking; + hdfsScanBuf_[i].buf_ = hdfsScanBufBacking + headRoom_; + } + bufBegin_ = NULL; + bufEnd_ = NULL; + logicalBufEnd_ = NULL; + headRoomCopied_ = 0; + prevRangeNum_ = -1; + currRangeBytesRead_ = 0; + recordSkip_ = FALSE; + } moveExprColsBuffer_ = new(space) ExSimpleSQLBuffer( 1, // one row (Int32)hdfsScanTdb.moveExprColsRowLength_, space); @@ -202,9 +222,7 @@ ExHdfsScanTcb::ExHdfsScanTcb( int jniDebugTimeout = 0; ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(), (char*)"", //Later replace with server cqd - (char*)"", ////Later replace with port cqd - jniDebugPort, - jniDebugTimeout); + (char*)""); // Populate the hdfsInfo list into an array to gain o(1) lookup access Queue* hdfsInfoList = hdfsScanTdb.getHdfsFileInfoList(); @@ -238,9 +256,9 @@ void ExHdfsScanTcb::freeResources() deallocateAtp(workAtp_, getSpace()); workAtp_ = NULL; } - if (hdfsScanBuffer_) + if (hdfsScanBuffer_ ) { - NADELETEBASIC(hdfsScanBuffer_, getSpace()); + NADELETEBASIC(hdfsScanBuffer_, getHeap()); hdfsScanBuffer_ = NULL; } if (hdfsAsciiSourceBuffer_) @@ -287,6 +305,8 @@ void ExHdfsScanTcb::freeResources() } if (hdfsClient_ != NULL) NADELETE(hdfsClient_, HdfsClient, getHeap()); + if (hdfsScan_ != NULL) + NADELETE(hdfsScan_, HdfsScan, getHeap()); } NABoolean ExHdfsScanTcb::needStatsEntry() @@ -384,10 +404,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work() HdfsFileInfo *hdfo = NULL; Lng32 openType = 0; int changedLen = 0; - ContextCli *currContext = getGlobals()->castToExExeStmtGlobals()->getCliGlobals()->currContext(); - hdfsFS hdfs = currContext->getHdfsServerConnection(hdfsScanTdb().hostName_,hdfsScanTdb().port_); - hdfsFileInfo *dirInfo = NULL; - Int32 hdfsErrorDetail = 0;//this is errno returned form underlying hdfsOpenFile call. + ContextCli *currContext = getGlobals()->castToExExeStmtGlobals()->getCliGlobals()->currContext(); + hdfsFS hdfs = currContext->getHdfsServerConnection(hdfsScanTdb().hostName_,hdfsScanTdb().port_); + hdfsFileInfo *dirInfo = NULL; + Int32 hdfsErrorDetail = 0;//this is errno returned form underlying hdfsOpenFile call. + HDFS_Scan_RetCode hdfsScanRetCode; + while (!qparent_.down->isEmpty()) { ex_queue_entry *pentry_down = qparent_.down->getHeadEntry(); @@ -442,8 +464,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work() case ASSIGN_RANGES_AT_RUNTIME: computeRangesAtRuntime(); currRangeNum_ = beginRangeNum_; - if (numRanges_ > 0) - step_ = INIT_HDFS_CURSOR; + if (numRanges_ > 0) { + if (useLibhdfsScan_) + step_ = INIT_HDFS_CURSOR; + else + step_ = SETUP_HDFS_SCAN; + } else step_ = DONE; break; @@ -518,11 +544,93 @@ ExWorkProcRetcode ExHdfsScanTcb::work() if (step_ == CHECK_FOR_DATA_MOD_AND_DONE) step_ = DONE; - else - step_ = INIT_HDFS_CURSOR; + else { + if (useLibhdfsScan_) + step_ = INIT_HDFS_CURSOR; + else + step_ = SETUP_HDFS_SCAN; + } } break; - + case SETUP_HDFS_SCAN: + { + if (hdfsScan_ != NULL) + NADELETE(hdfsScan_, HdfsScan, getHeap()); + hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), hdfsScanBuf_, hdfsScanBufMaxSize_, + &hdfsFileInfoListAsArray_, hdfsScanTdb().rangeTailIOSize_, hdfsScanRetCode); + if (hdfsScanRetCode != HDFS_SCAN_OK) + { + setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, "SETUP_HDFS_SCAN", + currContext->getJniErrorStr(), NULL); + step_ = HANDLE_ERROR_AND_DONE; + break; + } + bufBegin_ = NULL; + bufEnd_ = NULL; + logicalBufEnd_ = NULL; + headRoomCopied_ = 0; + prevRangeNum_ = -1; + currRangeBytesRead_ = 0; + recordSkip_ = FALSE; + step_ = TRAF_HDFS_READ; + } + break; + case TRAF_HDFS_READ: + { + hdfsScanRetCode = hdfsScan_->trafHdfsRead((NAHeap *)getHeap(), retArray_, sizeof(retArray_)/sizeof(int)); + if (hdfsScanRetCode == HDFS_SCAN_EOR) { + step_ = DONE; + break; + } + else if (hdfsScanRetCode != HDFS_SCAN_OK) { + setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, "SETUP_HDFS_SCAN", + currContext->getJniErrorStr(), NULL); + step_ = HANDLE_ERROR_AND_DONE; + break; + } + // Assign the starting address of the buffer + bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED]; + if (retArray_[IS_EOF]) + logicalBufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED]; + else if (retArray_[BYTES_COMPLETED] < hdfsScanBufMaxSize_) + logicalBufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED] - headRoom_; + else + logicalBufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED]; + hdfo_ = getRange(retArray_[RANGE_NO]); + if (retArray_[RANGE_NO] != prevRangeNum_) { + bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_; + if (hdfo_->getStartOffset() == 0) + recordSkip_ = FALSE; + else + recordSkip_ = TRUE; + } else { + bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ - headRoomCopied_; + recordSkip_ = FALSE; + } + prevRangeNum_ = retArray_[RANGE_NO]; + if (recordSkip_) { + hdfsBufNextRow_ = hdfs_strchr((char *)bufBegin_, + hdfsScanTdb().recordDelimiter_, + (char *)bufEnd_, + checkRangeDelimiter_, + hdfsScanTdb().getHiveScanMode(), &changedLen); + if (hdfsBufNextRow_ == NULL) { + setupError(8446, 0, "No record delimiter found in buffer from hdfsRead", + NULL, NULL); + step_ = HANDLE_ERROR_AND_DONE; + break; + } + } + else + hdfsBufNextRow_ = (char *)bufBegin_; + step_ = PROCESS_HDFS_ROW; + } + break; + case COPY_TAIL_TO_HEAD: + { + step_ = TRAF_HDFS_READ; + } + break; case INIT_HDFS_CURSOR: { hdfo_ = getRange(currRangeNum_); @@ -949,7 +1057,10 @@ ExWorkProcRetcode ExHdfsScanTcb::work() if (startOfNextRow == NULL) { - step_ = REPOS_HDFS_DATA; + if (useLibhdfsScan_) + step_ = REPOS_HDFS_DATA; + else + step_ = COPY_TAIL_TO_HEAD; if (!exception_) break; } @@ -1220,8 +1331,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work() workAtp_->setDiagsArea(NULL); // get rid of warnings. if (((pentry_down->downState.request == ex_queue::GET_N) && (pentry_down->downState.requestValue == matches_)) || - (pentry_down->downState.request == ex_queue::GET_NOMORE)) - step_ = CLOSE_HDFS_CURSOR; + (pentry_down->downState.request == ex_queue::GET_NOMORE)) { + if (useLibhdfsScan_) + step_ = CLOSE_HDFS_CURSOR; + else + step_ = DONE; + } else step_ = PROCESS_HDFS_ROW; break; @@ -1568,18 +1683,26 @@ char * ExHdfsScanTcb::extractAndTransformAsciiSourceToSqlRow(int &err, const char cd = hdfsScanTdb().columnDelimiter_; const char rd = hdfsScanTdb().recordDelimiter_; - const char *sourceDataEnd = hdfsScanBuffer_+trailingPrevRead_+ bytesRead_; - + const char *sourceDataEnd; + const char *endOfRequestedRange; + if (useLibhdfsScan_) { + sourceDataEnd = hdfsScanBuffer_+trailingPrevRead_+ bytesRead_; + endOfRequestedRange = endOfRequestedRange_; + } + else { + sourceDataEnd = (const char *)bufEnd_; + endOfRequestedRange = (const char *)logicalBufEnd_; + } hdfsLoggingRow_ = hdfsBufNextRow_; if (asciiSourceTD->numAttrs() == 0) { sourceRowEnd = hdfs_strchr(sourceData, rd, sourceDataEnd, checkRangeDelimiter_, mode, &changedLen); hdfsLoggingRowEnd_ = sourceRowEnd + changedLen; - if (!sourceRowEnd) - return NULL; - if ((endOfRequestedRange_) && - (sourceRowEnd >= endOfRequestedRange_)) { + if (sourceRowEnd == NULL) + return NULL; + if ((endOfRequestedRange) && + (sourceRowEnd >= endOfRequestedRange)) { checkRangeDelimiter_ = TRUE; *(sourceRowEnd +1)= RANGE_DELIMITER; } @@ -1623,8 +1746,8 @@ char * ExHdfsScanTcb::extractAndTransformAsciiSourceToSqlRow(int &err, if (rdSeen) { sourceRowEnd = sourceColEnd + changedLen; hdfsLoggingRowEnd_ = sourceRowEnd; - if ((endOfRequestedRange_) && - (sourceRowEnd >= endOfRequestedRange_)) { + if ((endOfRequestedRange) && + (sourceRowEnd >= endOfRequestedRange)) { checkRangeDelimiter_ = TRUE; *(sourceRowEnd +1)= RANGE_DELIMITER; } @@ -1697,8 +1820,8 @@ char * ExHdfsScanTcb::extractAndTransformAsciiSourceToSqlRow(int &err, sourceRowEnd = hdfs_strchr(sourceData, rd, sourceDataEnd, checkRangeDelimiter_,mode, &changedLen); if (sourceRowEnd) { hdfsLoggingRowEnd_ = sourceRowEnd + changedLen; //changedLen is when hdfs_strchr move the return pointer to remove the extra \r - if ((endOfRequestedRange_) && - (sourceRowEnd >= endOfRequestedRange_ )) { + if ((endOfRequestedRange) && + (sourceRowEnd >= endOfRequestedRange )) { checkRangeDelimiter_ = TRUE; *(sourceRowEnd +1)= RANGE_DELIMITER; } http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/ExHdfsScan.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHdfsScan.h b/core/sql/executor/ExHdfsScan.h index 984fbb9..62bb11e 100644 --- a/core/sql/executor/ExHdfsScan.h +++ b/core/sql/executor/ExHdfsScan.h @@ -46,6 +46,8 @@ // ----------------------------------------------------------------------- class ExHdfsScanTdb; class ExHdfsScanTcb; +class HdfsScan; +class HdfsClient; // ----------------------------------------------------------------------- // Classes referenced in this file @@ -108,9 +110,46 @@ private: // --------------------------------------------------------------------- }; +/* + USE_LIBHDFS_SCAN - OFF enables hdfs access via java classes + org.trafodion.sql.HdfsScan and org.trafodion.sql.HdfsClient + Steps involved: + 1. Create a new HdfsScan object and set the scan ranges of the fragment instance in it + The scan range involves the following and it is determined either at runtime or compile time + a) filename + b) offset + c) len + Java layer always reads more than the len by rangeTailIOSize_ to accomdate the record split + 2. Two ByteBuffer objects are also passsed to HdfsScan object. These ByteBuffers are backed up by + 2 native buffers where the data is fetched. The buffer has a head room of size rangeTailIOSize_ and the + data is always read after the head room. + 3. HdfsScan returns an int array containing bytesRead, bufNo, rangeNo, isEOF and schedules either + the remaining bytes to be read or the next range using ByteBuffers alternatively. + 4. HdfsScan returns null array when there is no more data to be read. + 5. When the data is processed in one ByteBuffer in the native thread, the data is fetched into the other ByteBuffer by + another Java thread. + 6. Native layer after processing all the rows in one ByteBuffer, moves the last incomplete row to head room of the + other ByteBuffer. Then it requests to check if the read is complete. The native layer processes the buffer starting + from the copied incomplete row. +*/ + class ExHdfsScanTcb : public ex_tcb { + public: + enum + { + BYTES_COMPLETED, + BUF_NO, + RANGE_NO, + IS_EOF + } retArrayIndices_; + + struct HDFS_SCAN_BUF + { + BYTE *headRoom_; + BYTE *buf_; + }; ExHdfsScanTcb( const ComTdbHdfsScan &tdb, ex_globals *glob ); @@ -165,6 +204,9 @@ protected: , DONE , HANDLE_ERROR_WITH_CLOSE , HANDLE_ERROR_AND_DONE + , SETUP_HDFS_SCAN + , TRAF_HDFS_READ + , COPY_TAIL_TO_HEAD } step_,nextStep_; ///////////////////////////////////////////////////// @@ -296,7 +338,21 @@ protected: // this array is populated from the info list stored as Queue. HdfsFileInfoArray hdfsFileInfoListAsArray_; + HdfsClient *hdfsClient_; + HdfsScan *hdfsScan_; + NABoolean useLibhdfsScan_; + BYTE *hdfsScanBufBacking_[2]; + HDFS_SCAN_BUF hdfsScanBuf_[2]; + int retArray_[4]; + BYTE *bufBegin_; + BYTE *bufEnd_; + BYTE *logicalBufEnd_; + long currRangeBytesRead_; + int headRoomCopied_; + int headRoom_; + int prevRangeNum_; + NABoolean recordSkip_; }; class ExOrcScanTcb : public ExHdfsScanTcb http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/HBaseClient_JNI.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/HBaseClient_JNI.cpp b/core/sql/executor/HBaseClient_JNI.cpp index fe56d94..6b400cd 100644 --- a/core/sql/executor/HBaseClient_JNI.cpp +++ b/core/sql/executor/HBaseClient_JNI.cpp @@ -114,8 +114,8 @@ static const char* const hbcErrorEnumStr[] = // ////////////////////////////////////////////////////////////////////////////// // private default constructor -HBaseClient_JNI::HBaseClient_JNI(NAHeap *heap, int debugPort, int debugTimeout) - : JavaObjectInterface(heap, debugPort, debugTimeout) +HBaseClient_JNI::HBaseClient_JNI(NAHeap *heap) + : JavaObjectInterface(heap) ,isConnected_(FALSE) { for (int i=0; i<NUM_HBASE_WORKER_THREADS; i++) { @@ -137,7 +137,7 @@ char* HBaseClient_JNI::getErrorText(HBC_RetCode errEnum) ////////////////////////////////////////////////////////////////////////////// // ////////////////////////////////////////////////////////////////////////////// -HBaseClient_JNI* HBaseClient_JNI::getInstance(int debugPort, int debugTimeout) +HBaseClient_JNI* HBaseClient_JNI::getInstance() { ContextCli *currContext = GetCliGlobals()->currContext(); HBaseClient_JNI *hbaseClient_JNI = currContext->getHBaseClient(); @@ -145,8 +145,7 @@ HBaseClient_JNI* HBaseClient_JNI::getInstance(int debugPort, int debugTimeout) { NAHeap *heap = currContext->exHeap(); - hbaseClient_JNI = new (heap) HBaseClient_JNI(heap, - debugPort, debugTimeout); + hbaseClient_JNI = new (heap) HBaseClient_JNI(heap); currContext->setHbaseClient(hbaseClient_JNI); } return hbaseClient_JNI; @@ -301,7 +300,8 @@ HBC_RetCode HBaseClient_JNI::init() JavaMethods_[JM_TRUNCATE ].jm_name = "truncate"; JavaMethods_[JM_TRUNCATE ].jm_signature = "(Ljava/lang/String;ZJ)Z"; rc = (HBC_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); - javaMethodsInitialized_ = TRUE; + if (rc == HBC_OK) + javaMethodsInitialized_ = TRUE; pthread_mutex_unlock(&javaMethodsInitMutex_); } return rc; @@ -1583,7 +1583,8 @@ HBLC_RetCode HBulkLoadClient_JNI::init() JavaMethods_[JM_ADD_TO_HFILE_DB ].jm_signature = "(SLjava/lang/Object;Ljava/lang/Object;)Z"; rc = (HBLC_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); - javaMethodsInitialized_ = TRUE; + if (rc == HBLC_OK) + javaMethodsInitialized_ = TRUE; pthread_mutex_unlock(&javaMethodsInitMutex_); } return rc; @@ -3208,7 +3209,8 @@ HTC_RetCode HTableClient_JNI::init() JavaMethods_[JM_COMPLETE_PUT ].jm_signature = "(I[Z)Z"; rc = (HTC_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); - javaMethodsInitialized_ = TRUE; + if (rc == HTC_OK) + javaMethodsInitialized_ = TRUE; pthread_mutex_unlock(&javaMethodsInitMutex_); } return rc; @@ -3798,7 +3800,8 @@ HVC_RetCode HiveClient_JNI::init() JavaMethods_[JM_EXEC_HIVE_SQL].jm_name = "executeHiveSQL"; JavaMethods_[JM_EXEC_HIVE_SQL].jm_signature = "(Ljava/lang/String;)V"; rc = (HVC_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); - javaMethodsInitialized_ = TRUE; + if (rc == HVC_OK) + javaMethodsInitialized_ = TRUE; pthread_mutex_unlock(&javaMethodsInitMutex_); } return rc; http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/HBaseClient_JNI.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/HBaseClient_JNI.h b/core/sql/executor/HBaseClient_JNI.h index f6667d9..3177a7a 100644 --- a/core/sql/executor/HBaseClient_JNI.h +++ b/core/sql/executor/HBaseClient_JNI.h @@ -246,7 +246,7 @@ public: std::string* getHTableName(); // Get the error description. - virtual char* getErrorText(HTC_RetCode errEnum); + static char* getErrorText(HTC_RetCode errEnum); void setTableName(const char *tableName) { @@ -429,7 +429,7 @@ typedef enum { class HBaseClient_JNI : public JavaObjectInterface { public: - static HBaseClient_JNI* getInstance(int debugPort, int debugTimeout); + static HBaseClient_JNI* getInstance(); static void deleteInstance(); // Destructor @@ -488,7 +488,7 @@ public: HBaseClientRequest* getHBaseRequest(); bool workerThreadsStarted() { return (threadID_[0] ? true : false); } // Get the error description. - virtual char* getErrorText(HBC_RetCode errEnum); + static char* getErrorText(HBC_RetCode errEnum); static void logIt(const char* str); @@ -542,7 +542,7 @@ public: private: // private default constructor - HBaseClient_JNI(NAHeap *heap, int debugPort, int debugTimeout); + HBaseClient_JNI(NAHeap *heap); NAArray<HbaseStr>* getKeys(Int32 funcIndex, NAHeap *heap, const char *tableName, bool useTRex); private: @@ -665,7 +665,7 @@ public: HVC_RetCode executeHiveSQL(const char* hiveSQL); // Get the error description. - virtual char* getErrorText(HVC_RetCode errEnum); + static char* getErrorText(HVC_RetCode errEnum); static void logIt(const char* str); @@ -757,7 +757,7 @@ public: HBLC_RetCode bulkLoadCleanup(const HbaseStr &tblName, const Text& location); // Get the error description. - virtual char* getErrorText(HBLC_RetCode errEnum); + static char* getErrorText(HBLC_RetCode errEnum); private: http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/HdfsClient_JNI.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/HdfsClient_JNI.cpp b/core/sql/executor/HdfsClient_JNI.cpp index a3aef5a..63c4ac1 100644 --- a/core/sql/executor/HdfsClient_JNI.cpp +++ b/core/sql/executor/HdfsClient_JNI.cpp @@ -37,6 +37,11 @@ pthread_mutex_t HdfsScan::javaMethodsInitMutex_ = PTHREAD_MUTEX_INITIALIZER; static const char* const hdfsScanErrorEnumStr[] = { + "Error in HdfsScan::setScanRanges" + ,"Java Exception in HdfsScan::setScanRanges" + ,"Error in HdfsScan::trafHdfsRead" + ,"Java Exceptiokn in HdfsScan::trafHdfsRead" + , "Hdfs scan End of Ranges" }; @@ -62,13 +67,14 @@ HDFS_Scan_RetCode HdfsScan::init() JavaMethods_[JM_CTOR ].jm_name = "<init>"; JavaMethods_[JM_CTOR ].jm_signature = "()V"; - JavaMethods_[JM_INIT_SCAN_RANGES].jm_name = "<init>"; - JavaMethods_[JM_INIT_SCAN_RANGES].jm_signature = "(Ljava/lang/Object;Ljava/lang/Object;[Ljava/lang/String;[J[J)V"; + JavaMethods_[JM_SET_SCAN_RANGES].jm_name = "setScanRanges"; + JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;[Ljava/lang/String;[J[J)V"; JavaMethods_[JM_TRAF_HDFS_READ].jm_name = "trafHdfsRead"; JavaMethods_[JM_TRAF_HDFS_READ].jm_signature = "()[I"; rc = (HDFS_Scan_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); - javaMethodsInitialized_ = TRUE; + if (rc == HDFS_SCAN_OK) + javaMethodsInitialized_ = TRUE; pthread_mutex_unlock(&javaMethodsInitMutex_); } return rc; @@ -79,17 +85,131 @@ char* HdfsScan::getErrorText(HDFS_Scan_RetCode errEnum) if (errEnum < (HDFS_Scan_RetCode)JOI_LAST) return JavaObjectInterface::getErrorText((JOI_RetCode)errEnum); else - return (char*)hdfsScanErrorEnumStr[errEnum-HDFS_SCAN_FIRST-1]; + return (char*)hdfsScanErrorEnumStr[errEnum-HDFS_SCAN_FIRST]; } -////////////////////////////////////////////////////////////////////////////// -HDFS_Scan_RetCode HdfsScan::initScanRanges() + +///////////////////////////////////////////////////////////////////////////// +HDFS_Scan_RetCode HdfsScan::setScanRanges(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, + HdfsFileInfoArray *hdfsFileInfoArray, int rangeTailIOSize) { + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::setScanRanges() called."); + + if (initJNIEnv() != JOI_OK) + return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM; + + jobject j_buf1 = jenv_->NewDirectByteBuffer(hdfsScanBuf[0].buf_, scanBufSize); + if (j_buf1 == NULL) { + GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM)); + jenv_->PopLocalFrame(NULL); + return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM; + } + + jobject j_buf2 = jenv_->NewDirectByteBuffer(hdfsScanBuf[1].buf_, scanBufSize); + if (j_buf2 == NULL) { + GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM)); + jenv_->PopLocalFrame(NULL); + return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM; + } + jobjectArray j_filenames = NULL; + jlongArray j_offsets = NULL; + jlongArray j_lens = NULL; + HdfsFileInfo *hdfo; + jstring j_obj; + + HDFS_Scan_RetCode hdfsScanRetCode = HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM; + int arrayLen = hdfsFileInfoArray->entries(); + for (int i = 0; i < arrayLen; i++) { + hdfo = hdfsFileInfoArray->at(i); + j_obj = jenv_->NewStringUTF(hdfo->fileName()); + if (jenv_->ExceptionCheck()) { + jenv_->PopLocalFrame(NULL); + return hdfsScanRetCode; + } + if (j_filenames == NULL) { + j_filenames = jenv_->NewObjectArray(arrayLen, jenv_->GetObjectClass(j_obj), NULL); + if (jenv_->ExceptionCheck()) { + jenv_->PopLocalFrame(NULL); + return hdfsScanRetCode; + } + } + jenv_->SetObjectArrayElement(j_filenames, i, (jobject)j_obj); + jenv_->DeleteLocalRef(j_obj); + if (j_offsets == NULL) { + j_offsets = jenv_->NewLongArray(arrayLen); + if (jenv_->ExceptionCheck()) { + jenv_->PopLocalFrame(NULL); + return hdfsScanRetCode; + } + } + long offset = hdfo->getStartOffset(); + jenv_->SetLongArrayRegion(j_offsets, i, 1, &offset); + if (j_lens == NULL) { + j_lens = jenv_->NewLongArray(arrayLen); + if (jenv_->ExceptionCheck()) { + jenv_->PopLocalFrame(NULL); + return hdfsScanRetCode; + } + } + long len = hdfo->getBytesToRead()+rangeTailIOSize; + jenv_->SetLongArrayRegion(j_lens, i, 1, &len); + } + + jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, j_buf1, j_buf2, j_filenames, j_offsets, j_lens); + + if (jenv_->ExceptionCheck()) { + getExceptionDetails(); + logError(CAT_SQL_HDFS, __FILE__, __LINE__); + logError(CAT_SQL_HDFS, "HdfsScan::setScanRanges()", getLastError()); + jenv_->PopLocalFrame(NULL); + return HDFS_SCAN_ERROR_SET_SCAN_RANGES_EXCEPTION; + } return HDFS_SCAN_OK; } -HDFS_Scan_RetCode HdfsScan::trafHdfsRead() +HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, + HdfsFileInfoArray *hdfsFileInfoArray, int rangeTailIOSize, HDFS_Scan_RetCode &hdfsScanRetCode) { - return HDFS_SCAN_OK; + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::newInstance() called."); + + if (initJNIEnv() != JOI_OK) + return NULL; + hdfsScanRetCode = HDFS_SCAN_OK; + HdfsScan *hdfsScan = new (heap) HdfsScan(heap); + if (hdfsScan != NULL) { + hdfsScanRetCode = hdfsScan->init(); + if (hdfsScanRetCode == HDFS_SCAN_OK) + hdfsScanRetCode = hdfsScan->setScanRanges(heap, hdfsScanBuf, scanBufSize, + hdfsFileInfoArray, rangeTailIOSize); + if (hdfsScanRetCode != HDFS_SCAN_OK) { + NADELETE(hdfsScan, HdfsScan, heap); + hdfsScan = NULL; + } + } + return hdfsScan; +} + + +HDFS_Scan_RetCode HdfsScan::trafHdfsRead(NAHeap *heap, int retArray[], short arrayLen) +{ + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::trafHdfsRead() called."); + + if (initJNIEnv() != JOI_OK) + return HDFS_SCAN_ERROR_TRAF_HDFS_READ_PARAM; + + jintArray j_retArray = (jintArray)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_TRAF_HDFS_READ].methodID); + if (jenv_->ExceptionCheck()) { + getExceptionDetails(); + logError(CAT_SQL_HDFS, __FILE__, __LINE__); + logError(CAT_SQL_HDFS, "HdfsScan::setScanRanges()", getLastError()); + jenv_->PopLocalFrame(NULL); + return HDFS_SCAN_ERROR_TRAF_HDFS_READ_EXCEPTION; + } + if (j_retArray == NULL) + return HDFS_SCAN_EOR; + short retArrayLen = jenv_->GetArrayLength(j_retArray); + ex_assert(retArrayLen == arrayLen, "HdfsScan::trafHdfsRead() InternalError: retArrayLen != arrayLen"); + jenv_->GetIntArrayRegion(j_retArray, 0, 4, retArray); + return HDFS_SCAN_OK; } // =========================================================================== @@ -123,12 +243,15 @@ static const char* const hdfsClientErrorEnumStr[] = ////////////////////////////////////////////////////////////////////////////// HdfsClient *HdfsClient::newInstance(NAHeap *heap, HDFS_Client_RetCode &retCode) { + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::newInstance() called."); + + if (initJNIEnv() != JOI_OK) + return NULL; retCode = HDFS_CLIENT_OK; HdfsClient *hdfsClient = new (heap) HdfsClient(heap); if (hdfsClient != NULL) { retCode = hdfsClient->init(); - if (retCode != HDFS_CLIENT_OK) - { + if (retCode != HDFS_CLIENT_OK) { NADELETE(hdfsClient, HdfsClient, heap); hdfsClient = NULL; } @@ -170,7 +293,8 @@ HDFS_Client_RetCode HdfsClient::init() JavaMethods_[JM_HDFS_DELETE_PATH].jm_name = "hdfsDeletePath"; JavaMethods_[JM_HDFS_DELETE_PATH].jm_signature = "(Ljava/lang/String;)Z"; rc = (HDFS_Client_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); - javaMethodsInitialized_ = TRUE; + if (rc == HDFS_CLIENT_OK) + javaMethodsInitialized_ = TRUE; pthread_mutex_unlock(&javaMethodsInitMutex_); } return rc; @@ -189,7 +313,7 @@ char* HdfsClient::getErrorText(HDFS_Client_RetCode errEnum) HDFS_Client_RetCode HdfsClient::hdfsCreate(const char* path, NABoolean compress) { - QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "HdfsClient::hdfsCreate(%s) called.", path); + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsCreate(%s) called.", path); if (initJNIEnv() != JOI_OK) return HDFS_CLIENT_ERROR_HDFS_CREATE_PARAM; @@ -209,15 +333,15 @@ HDFS_Client_RetCode HdfsClient::hdfsCreate(const char* path, NABoolean compress) if (jenv_->ExceptionCheck()) { getExceptionDetails(); - logError(CAT_SQL_HBASE, __FILE__, __LINE__); - logError(CAT_SQL_HBASE, "HdfsClient::hdfsCreate()", getLastError()); + logError(CAT_SQL_HDFS, __FILE__, __LINE__); + logError(CAT_SQL_HDFS, "HdfsClient::hdfsCreate()", getLastError()); jenv_->PopLocalFrame(NULL); return HDFS_CLIENT_ERROR_HDFS_CREATE_EXCEPTION; } if (jresult == false) { - logError(CAT_SQL_HBASE, "HdfsClient::hdfsCreate()", getLastError()); + logError(CAT_SQL_HDFS, "HdfsClient::hdfsCreate()", getLastError()); jenv_->PopLocalFrame(NULL); return HDFS_CLIENT_ERROR_HDFS_CREATE_PARAM; } @@ -231,7 +355,7 @@ HDFS_Client_RetCode HdfsClient::hdfsCreate(const char* path, NABoolean compress) ////////////////////////////////////////////////////////////////////////////// HDFS_Client_RetCode HdfsClient::hdfsWrite(const char* data, Int64 len) { - QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "HdfsClient::hdfsWrite(%ld) called.", len); + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsWrite(%ld) called.", len); if (initJNIEnv() != JOI_OK) return HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION; @@ -252,15 +376,15 @@ HDFS_Client_RetCode HdfsClient::hdfsWrite(const char* data, Int64 len) if (jenv_->ExceptionCheck()) { getExceptionDetails(); - logError(CAT_SQL_HBASE, __FILE__, __LINE__); - logError(CAT_SQL_HBASE, "HdfsClient::hdfsWrite()", getLastError()); + logError(CAT_SQL_HDFS, __FILE__, __LINE__); + logError(CAT_SQL_HDFS, "HdfsClient::hdfsWrite()", getLastError()); jenv_->PopLocalFrame(NULL); return HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION; } if (jresult == false) { - logError(CAT_SQL_HBASE, "HdfsClient::hdfsWrite()", getLastError()); + logError(CAT_SQL_HDFS, "HdfsClient::hdfsWrite()", getLastError()); jenv_->PopLocalFrame(NULL); return HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION; } @@ -275,7 +399,7 @@ HDFS_Client_RetCode HdfsClient::hdfsWrite(const char* data, Int64 len) ////////////////////////////////////////////////////////////////////////////// HDFS_Client_RetCode HdfsClient::hdfsClose() { - QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "HdfsClient::close() called."); + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::close() called."); if (initJNIEnv() != JOI_OK) return HDFS_CLIENT_ERROR_HDFS_CLOSE_EXCEPTION; @@ -287,15 +411,15 @@ HDFS_Client_RetCode HdfsClient::hdfsClose() if (jenv_->ExceptionCheck()) { getExceptionDetails(); - logError(CAT_SQL_HBASE, __FILE__, __LINE__); - logError(CAT_SQL_HBASE, "HdfsClient::hdfsClose()", getLastError()); + logError(CAT_SQL_HDFS, __FILE__, __LINE__); + logError(CAT_SQL_HDFS, "HdfsClient::hdfsClose()", getLastError()); jenv_->PopLocalFrame(NULL); return HDFS_CLIENT_ERROR_HDFS_CLOSE_EXCEPTION; } if (jresult == false) { - logError(CAT_SQL_HBASE, "HdfsClient::hdfsClose()", getLastError()); + logError(CAT_SQL_HDFS, "HdfsClient::hdfsClose()", getLastError()); jenv_->PopLocalFrame(NULL); return HDFS_CLIENT_ERROR_HDFS_CLOSE_EXCEPTION; } @@ -306,7 +430,7 @@ HDFS_Client_RetCode HdfsClient::hdfsClose() HDFS_Client_RetCode HdfsClient::hdfsCleanUnloadPath( const NAString& uldPath) { - QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "HdfsClient::hdfsCleanUnloadPath(%s) called.", + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsCleanUnloadPath(%s) called.", uldPath.data()); if (initJNIEnv() != JOI_OK) return HDFS_CLIENT_ERROR_HDFS_CLEANUP_PARAM; @@ -324,8 +448,8 @@ HDFS_Client_RetCode HdfsClient::hdfsCleanUnloadPath( const NAString& uldPath) if (jenv_->ExceptionCheck()) { getExceptionDetails(); - logError(CAT_SQL_HBASE, __FILE__, __LINE__); - logError(CAT_SQL_HBASE, "HdfsClient::hdfsCleanUnloadPath()", getLastError()); + logError(CAT_SQL_HDFS, __FILE__, __LINE__); + logError(CAT_SQL_HDFS, "HdfsClient::hdfsCleanUnloadPath()", getLastError()); jenv_->PopLocalFrame(NULL); return HDFS_CLIENT_ERROR_HDFS_CLEANUP_EXCEPTION; } @@ -337,7 +461,7 @@ HDFS_Client_RetCode HdfsClient::hdfsCleanUnloadPath( const NAString& uldPath) HDFS_Client_RetCode HdfsClient::hdfsMergeFiles( const NAString& srcPath, const NAString& dstPath) { - QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "HdfsClient::hdfsMergeFiles(%s, %s) called.", + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsMergeFiles(%s, %s) called.", srcPath.data(), dstPath.data()); if (initJNIEnv() != JOI_OK) @@ -364,15 +488,15 @@ HDFS_Client_RetCode HdfsClient::hdfsMergeFiles( const NAString& srcPath, if (jenv_->ExceptionCheck()) { getExceptionDetails(); - logError(CAT_SQL_HBASE, __FILE__, __LINE__); - logError(CAT_SQL_HBASE, "HdfsClient::hdfsMergeFiles()", getLastError()); + logError(CAT_SQL_HDFS, __FILE__, __LINE__); + logError(CAT_SQL_HDFS, "HdfsClient::hdfsMergeFiles()", getLastError()); jenv_->PopLocalFrame(NULL); return HDFS_CLIENT_ERROR_HDFS_MERGE_FILES_EXCEPTION; } if (jresult == false) { - logError(CAT_SQL_HBASE, "HdfsClient::hdfsMergeFiles()", getLastError()); + logError(CAT_SQL_HDFS, "HdfsClient::hdfsMergeFiles()", getLastError()); jenv_->PopLocalFrame(NULL); return HDFS_CLIENT_ERROR_HDFS_MERGE_FILES_EXCEPTION; } @@ -383,7 +507,7 @@ HDFS_Client_RetCode HdfsClient::hdfsMergeFiles( const NAString& srcPath, HDFS_Client_RetCode HdfsClient::hdfsDeletePath( const NAString& delPath) { - QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "HdfsClient::hdfsDeletePath(%s called.", + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsDeletePath(%s called.", delPath.data()); if (initJNIEnv() != JOI_OK) return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_EXCEPTION; @@ -402,15 +526,15 @@ HDFS_Client_RetCode HdfsClient::hdfsDeletePath( const NAString& delPath) if (jenv_->ExceptionCheck()) { getExceptionDetails(); - logError(CAT_SQL_HBASE, __FILE__, __LINE__); - logError(CAT_SQL_HBASE, "HdfsClient::hdfsDeletePath()", getLastError()); + logError(CAT_SQL_HDFS, __FILE__, __LINE__); + logError(CAT_SQL_HDFS, "HdfsClient::hdfsDeletePath()", getLastError()); jenv_->PopLocalFrame(NULL); return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_EXCEPTION; } if (jresult == false) { - logError(CAT_SQL_HBASE, "HdfsClient::hdfsDeletePath()", getLastError()); + logError(CAT_SQL_HDFS, "HdfsClient::hdfsDeletePath()", getLastError()); jenv_->PopLocalFrame(NULL); return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_EXCEPTION; } @@ -421,7 +545,7 @@ HDFS_Client_RetCode HdfsClient::hdfsDeletePath( const NAString& delPath) HDFS_Client_RetCode HdfsClient::hdfsExists( const NAString& uldPath, NABoolean & exist) { - QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "HdfsClient::hdfsExists(%s) called.", + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsExists(%s) called.", uldPath.data()); if (initJNIEnv() != JOI_OK) @@ -441,8 +565,8 @@ HDFS_Client_RetCode HdfsClient::hdfsExists( const NAString& uldPath, NABoolean & if (jenv_->ExceptionCheck()) { getExceptionDetails(); - logError(CAT_SQL_HBASE, __FILE__, __LINE__); - logError(CAT_SQL_HBASE, "HdfsClient::hdfsExists()", getLastError()); + logError(CAT_SQL_HDFS, __FILE__, __LINE__); + logError(CAT_SQL_HDFS, "HdfsClient::hdfsExists()", getLastError()); jenv_->PopLocalFrame(NULL); return HDFS_CLIENT_ERROR_HDFS_EXISTS_EXCEPTION; } http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/HdfsClient_JNI.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/HdfsClient_JNI.h b/core/sql/executor/HdfsClient_JNI.h index 8adf42f..0426ebc 100644 --- a/core/sql/executor/HdfsClient_JNI.h +++ b/core/sql/executor/HdfsClient_JNI.h @@ -24,6 +24,7 @@ #define HDFS_CLIENT_H #include "JavaObjectInterface.h" +#include "ExHdfsScan.h" // =========================================================================== // ===== The native HdfsScan class implements access to the Java methods @@ -33,6 +34,11 @@ typedef enum { HDFS_SCAN_OK = JOI_OK ,HDFS_SCAN_FIRST = JOI_LAST + ,HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM = HDFS_SCAN_FIRST + ,HDFS_SCAN_ERROR_SET_SCAN_RANGES_EXCEPTION + ,HDFS_SCAN_ERROR_TRAF_HDFS_READ_PARAM + ,HDFS_SCAN_ERROR_TRAF_HDFS_READ_EXCEPTION + ,HDFS_SCAN_EOR ,HDFS_SCAN_LAST } HDFS_Scan_RetCode; @@ -44,28 +50,28 @@ public: : JavaObjectInterface(heap) {} - // Destructor - virtual ~HdfsScan(); - - // Get the error description. - virtual char* getErrorText(HDFS_Scan_RetCode errEnum); - // Initialize JVM and all the JNI configuration. // Must be called. HDFS_Scan_RetCode init(); - HDFS_Scan_RetCode initScanRanges(); + // Get the error description. + static char* getErrorText(HDFS_Scan_RetCode errEnum); + + static HdfsScan *newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, + HdfsFileInfoArray *hdfsFileInfoArray, int rangeTailIOSize, HDFS_Scan_RetCode &hdfsScanRetCode); + + HDFS_Scan_RetCode setScanRanges(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, + HdfsFileInfoArray *hdfsFileInfoArray, int rangeTailIOSize); - HDFS_Scan_RetCode trafHdfsRead(); + HDFS_Scan_RetCode trafHdfsRead(NAHeap *heap, int retArray[], short arrayLen); private: enum JAVA_METHODS { JM_CTOR = 0, - JM_INIT_SCAN_RANGES, + JM_SET_SCAN_RANGES, JM_TRAF_HDFS_READ, JM_LAST }; - static jclass javaClass_; static JavaMethodInit* JavaMethods_; static bool javaMethodsInitialized_; @@ -109,7 +115,7 @@ public: static HdfsClient *newInstance(NAHeap *heap, HDFS_Client_RetCode &retCode); // Get the error description. - virtual char* getErrorText(HDFS_Client_RetCode errEnum); + static char* getErrorText(HDFS_Client_RetCode errEnum); // Initialize JVM and all the JNI configuration. // Must be called. http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/JavaObjectInterface.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/JavaObjectInterface.cpp b/core/sql/executor/JavaObjectInterface.cpp index 2f5b2c7..ecd8c1e 100644 --- a/core/sql/executor/JavaObjectInterface.cpp +++ b/core/sql/executor/JavaObjectInterface.cpp @@ -40,6 +40,9 @@ JavaVM* JavaObjectInterface::jvm_ = NULL; jint JavaObjectInterface::jniHandleCapacity_ = 0; +int JavaObjectInterface::debugPort_ = 0; +int JavaObjectInterface::debugTimeout_ = 0; + __thread JNIEnv* jenv_ = NULL; __thread NAString *tsRecentJMFromJNI = NULL; jclass JavaObjectInterface::gThrowableClass = NULL; @@ -285,7 +288,7 @@ int JavaObjectInterface::createJVM(LmJavaOptions *options) if (mySqRoot != NULL) { len = strlen(mySqRoot); - oomDumpDir = new (heap_) char[len+50]; + oomDumpDir = new char[len+50]; strcpy(oomDumpDir, "-XX:HeapDumpPath="); strcat(oomDumpDir, mySqRoot); strcat(oomDumpDir, "/logs"); @@ -317,7 +320,7 @@ int JavaObjectInterface::createJVM(LmJavaOptions *options) if (classPathArg) free(classPathArg); if (oomDumpDir) - NADELETEBASIC(oomDumpDir, heap_); + delete oomDumpDir; return ret; } @@ -346,8 +349,6 @@ JOI_RetCode JavaObjectInterface::initJVM(LmJavaOptions *options) GetCliGlobals()->setJniErrorStr(getErrorText(JOI_ERROR_CHECK_JVM)); return JOI_ERROR_CREATE_JVM; } - - needToDetach_ = false; QRLogger::log(CAT_SQL_HDFS_JNI_TOP, LL_DEBUG, "Created a new JVM."); } char *jniHandleCapacityStr = getenv("TRAF_JNIHANDLE_CAPACITY"); @@ -371,7 +372,6 @@ JOI_RetCode JavaObjectInterface::initJVM(LmJavaOptions *options) if (result != JNI_OK) return JOI_ERROR_ATTACH_JVM; - needToDetach_ = true; QRLogger::log(CAT_SQL_HDFS_JNI_TOP, LL_DEBUG, "Attached to an existing JVM from another thread."); break; @@ -537,11 +537,10 @@ void JavaObjectInterface::logError(std::string &cat, const char* file, int line) NABoolean JavaObjectInterface::getExceptionDetails(JNIEnv *jenv) { - NAString error_msg(heap_); - if (jenv == NULL) jenv = jenv_; CliGlobals *cliGlobals = GetCliGlobals(); + NAString error_msg(heap_); if (jenv == NULL) { error_msg = "Internal Error - Unable to obtain jenv"; @@ -646,7 +645,6 @@ JOI_RetCode JavaObjectInterface::initJNIEnv() return retcode; } if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) { - getExceptionDetails(); return JOI_ERROR_INIT_JNI; } return JOI_OK; http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/JavaObjectInterface.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/JavaObjectInterface.h b/core/sql/executor/JavaObjectInterface.h index 772148b..b167420 100644 --- a/core/sql/executor/JavaObjectInterface.h +++ b/core/sql/executor/JavaObjectInterface.h @@ -79,13 +79,10 @@ public: protected: // Default constructor - for creating a new JVM - JavaObjectInterface(NAHeap *heap , int debugPort = 0, int debugTimeout = 0) + JavaObjectInterface(NAHeap *heap) : heap_(heap) ,javaObj_(NULL) - ,needToDetach_(false) ,isInitialized_(false) - ,debugPort_(debugPort) - ,debugTimeout_(debugTimeout) { tid_ = syscall(SYS_gettid); } @@ -94,10 +91,7 @@ protected: JavaObjectInterface(NAHeap *heap, jobject jObj) : heap_(heap) ,javaObj_(NULL) - ,needToDetach_(false) ,isInitialized_(false) - ,debugPort_(0) - ,debugTimeout_(0) { tid_ = syscall(SYS_gettid); // When jObj is not null in the constructor @@ -113,17 +107,17 @@ protected: virtual ~JavaObjectInterface(); // Create a new JVM - int createJVM(LmJavaOptions *options); + static int createJVM(LmJavaOptions *options); // Initialize the JVM. - JOI_RetCode initJVM(LmJavaOptions *options = NULL); + static JOI_RetCode initJVM(LmJavaOptions *options = NULL); // Initialize JVM and all the JNI configuration. // Must be called. JOI_RetCode init(char *className, jclass &javaclass, JavaMethodInit* JavaMethods, Int32 howManyMethods, bool methodsInitialized); // Get the error description. - virtual char* getErrorText(JOI_RetCode errEnum); + static char* getErrorText(JOI_RetCode errEnum); NAString getLastError(); @@ -132,8 +126,8 @@ protected: void logError(std::string &cat, const char* methodName, jstring jresult); void logError(std::string &cat, const char* file, int line); - JOI_RetCode initJNIEnv(); - char* buildClassPath(); + static JOI_RetCode initJNIEnv(); + static char* buildClassPath(); public: void setJavaObject(jobject jobj); @@ -152,6 +146,7 @@ public: // Pass in jenv if the thread where the object is created is different than // the thread where exception occurred NABoolean getExceptionDetails(JNIEnv *jenv = NULL); + void appendExceptionMessages(JNIEnv *jenv, jthrowable a_exception, NAString &error_msg); NAHeap *getHeap() { return heap_; } @@ -166,10 +161,9 @@ protected: static jint jniHandleCapacity_; jobject javaObj_; - bool needToDetach_; bool isInitialized_; - int debugPort_; - int debugTimeout_; + static int debugPort_; + static int debugTimeout_; pid_t tid_; NAHeap *heap_; }; http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/OrcFileReader.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/OrcFileReader.cpp b/core/sql/executor/OrcFileReader.cpp index ddaa27a..988704d 100644 --- a/core/sql/executor/OrcFileReader.cpp +++ b/core/sql/executor/OrcFileReader.cpp @@ -138,7 +138,8 @@ OFR_RetCode OrcFileReader::init() javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); - javaMethodsInitialized_ = TRUE; + if (lv_retcode == OFR_OK) + javaMethodsInitialized_ = TRUE; pthread_mutex_unlock(&javaMethodsInitMutex_); } return lv_retcode; http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/SequenceFileReader.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/SequenceFileReader.cpp b/core/sql/executor/SequenceFileReader.cpp index 5bc2f7f..8feb1d5 100644 --- a/core/sql/executor/SequenceFileReader.cpp +++ b/core/sql/executor/SequenceFileReader.cpp @@ -115,7 +115,8 @@ SFR_RetCode SequenceFileReader::init() JavaMethods_[JM_CLOSE ].jm_signature = "()Ljava/lang/String;"; rc = (SFR_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); - javaMethodsInitialized_ = TRUE; + if (rc == SFR_OK) + javaMethodsInitialized_ = TRUE; pthread_mutex_unlock(&javaMethodsInitMutex_); } return rc; @@ -440,7 +441,8 @@ SFW_RetCode SequenceFileWriter::init() JavaMethods_[JM_CLOSE ].jm_signature = "()Ljava/lang/String;"; rc = (SFW_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); - javaMethodsInitialized_ = TRUE; + if (rc == SFW_OK) + javaMethodsInitialized_ = TRUE; pthread_mutex_unlock(&javaMethodsInitMutex_); } return rc; http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/exp/ExpErrorEnums.h ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpErrorEnums.h b/core/sql/exp/ExpErrorEnums.h index ba604bf..8227cb8 100644 --- a/core/sql/exp/ExpErrorEnums.h +++ b/core/sql/exp/ExpErrorEnums.h @@ -163,6 +163,7 @@ enum ExeErrorCode EXE_OLAP_OVERFLOW_NOT_SUPPORTED = 8441, EXE_ERROR_FROM_LOB_INTERFACE = 8442, EXE_INVALID_LOB_HANDLE = 8443, + EXE_ERROR_HDFS_SCAN = 8447, EXE_LAST_EXPRESSIONS_ERROR = 8499, // --------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/exp/ExpHbaseInterface.cpp ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpHbaseInterface.cpp b/core/sql/exp/ExpHbaseInterface.cpp index b7746e0..8a1d426 100644 --- a/core/sql/exp/ExpHbaseInterface.cpp +++ b/core/sql/exp/ExpHbaseInterface.cpp @@ -48,9 +48,7 @@ ExpHbaseInterface::ExpHbaseInterface(CollHeap * heap, const char * server, - const char * zkPort, - int debugPort, - int debugTimeout) + const char * zkPort) { heap_ = heap; hbs_ = NULL; @@ -66,19 +64,13 @@ ExpHbaseInterface::ExpHbaseInterface(CollHeap * heap, strcpy(zkPort_, zkPort); else zkPort_[0] = 0; - - debugPort_ = debugPort; - debugTimeout_ = debugTimeout; } ExpHbaseInterface* ExpHbaseInterface::newInstance(CollHeap* heap, const char* server, - const char *zkPort, - int debugPort, - int debugTimeout) + const char *zkPort) { - return new (heap) ExpHbaseInterface_JNI(heap, server, TRUE, zkPort, - debugPort, debugTimeout); // This is the transactional interface + return new (heap) ExpHbaseInterface_JNI(heap, server, TRUE,zkPort); } NABoolean isParentQueryCanceled() @@ -283,8 +275,8 @@ char * getHbaseErrStr(Lng32 errEnum) // =========================================================================== ExpHbaseInterface_JNI::ExpHbaseInterface_JNI(CollHeap* heap, const char* server, bool useTRex, - const char *zkPort, int debugPort, int debugTimeout) - : ExpHbaseInterface(heap, server, zkPort, debugPort, debugTimeout) + const char *zkPort) + : ExpHbaseInterface(heap, server, zkPort) ,useTRex_(useTRex) ,client_(NULL) ,htc_(NULL) @@ -324,7 +316,7 @@ Lng32 ExpHbaseInterface_JNI::init(ExHbaseAccessStats *hbs) if (client_ == NULL) { HBaseClient_JNI::logIt("ExpHbaseInterface_JNI::init() creating new client."); - client_ = HBaseClient_JNI::getInstance(debugPort_, debugTimeout_); + client_ = HBaseClient_JNI::getInstance(); if (client_->isInitialized() == FALSE) { http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/exp/ExpHbaseInterface.h ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpHbaseInterface.h b/core/sql/exp/ExpHbaseInterface.h index f7b23cb..f68de05 100644 --- a/core/sql/exp/ExpHbaseInterface.h +++ b/core/sql/exp/ExpHbaseInterface.h @@ -71,9 +71,7 @@ class ExpHbaseInterface : public NABasicObject static ExpHbaseInterface* newInstance(CollHeap* heap, const char* server = NULL, - const char *zkPort = NULL, - int debugPort = 0, - int DebugTimeout = 0); + const char *zkPort = NULL); virtual ~ExpHbaseInterface() {} @@ -389,16 +387,12 @@ protected: ExpHbaseInterface(CollHeap * heap, const char * server = NULL, - const char * zkPort = NULL, - int debugPort = 0, - int debugTimeout = 0); + const char * zkPort = NULL); CollHeap * heap_; ExHbaseAccessStats * hbs_; char server_[MAX_SERVER_SIZE+1]; char zkPort_[MAX_PORT_SIZE+1]; - int debugPort_; - int debugTimeout_; }; char * getHbaseErrStr(Lng32 errEnum); @@ -410,7 +404,7 @@ class ExpHbaseInterface_JNI : public ExpHbaseInterface ExpHbaseInterface_JNI(CollHeap* heap, const char* server, bool useTRex, - const char *zkPort, int debugPort, int debugTimeout); + const char *zkPort); virtual ~ExpHbaseInterface_JNI(); http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/exp/ExpLOBinterface.h ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOBinterface.h b/core/sql/exp/ExpLOBinterface.h index df6c142..b98d2b4 100644 --- a/core/sql/exp/ExpLOBinterface.h +++ b/core/sql/exp/ExpLOBinterface.h @@ -344,19 +344,6 @@ Lng32 ExpLOBInterfaceGetLobLength(ExLobGlobals * exLobGlob, ); -/* -class HdfsFileInfo -{ - public: - char * fileName() { return fileName_; } - Int64 getStartOffset() { return startOffset_; } - Int64 getBytesToRead() { return bytesToRead_; } - Lng32 entryNum_; // 0 based, first entry is entry num 0. - NABasicPtr fileName_; - Int64 startOffset_; - Int64 bytesToRead_; -}; -*/ #endif http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/generator/GenRelScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/generator/GenRelScan.cpp b/core/sql/generator/GenRelScan.cpp index 013651b..3d01223 100644 --- a/core/sql/generator/GenRelScan.cpp +++ b/core/sql/generator/GenRelScan.cpp @@ -1391,6 +1391,9 @@ if (hTabStats->isOrcFile()) hdfsscan_tdb->setUseCif(useCIF); hdfsscan_tdb->setUseCifDefrag(useCIFDegrag); + if (CmpCommon::getDefault(USE_LIBHDFS_SCAN) == DF_ON) + hdfsscan_tdb->setUseLibhdfsScan(TRUE); + if(!generator->explainDisabled()) { generator->setExplainTuple( addExplainInfo(hdfsscan_tdb, 0, 0, generator)); http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/qmscommon/QRLogger.cpp ---------------------------------------------------------------------- diff --git a/core/sql/qmscommon/QRLogger.cpp b/core/sql/qmscommon/QRLogger.cpp index f4fb75d..0b8c398 100644 --- a/core/sql/qmscommon/QRLogger.cpp +++ b/core/sql/qmscommon/QRLogger.cpp @@ -60,6 +60,7 @@ std::string CAT_SQL_HDFS_SEQ_FILE_READER = "SQL.HDFS.SeqFileReader"; std::string CAT_SQL_HDFS_SEQ_FILE_WRITER = "SQL.HDFS.SeqFileWriter"; std::string CAT_SQL_HDFS_ORC_FILE_READER = "SQL.HDFS.OrcFileReader"; std::string CAT_SQL_HBASE = "SQL.HBase"; +std::string CAT_SQL_HDFS = "SQL.HDFS"; // these categories are currently not used std::string CAT_SQL_QMP = "SQL.Qmp"; http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/qmscommon/QRLogger.h ---------------------------------------------------------------------- diff --git a/core/sql/qmscommon/QRLogger.h b/core/sql/qmscommon/QRLogger.h index 5cabac4..3be016e 100644 --- a/core/sql/qmscommon/QRLogger.h +++ b/core/sql/qmscommon/QRLogger.h @@ -67,6 +67,7 @@ extern std::string CAT_SQL_HDFS_SEQ_FILE_READER; extern std::string CAT_SQL_HDFS_SEQ_FILE_WRITER; extern std::string CAT_SQL_HDFS_ORC_FILE_READER; extern std::string CAT_SQL_HBASE; +extern std::string CAT_SQL_HDFS; class ComDiagsArea; http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/sqlcomp/DefaultConstants.h ---------------------------------------------------------------------- diff --git a/core/sql/sqlcomp/DefaultConstants.h b/core/sql/sqlcomp/DefaultConstants.h index b7e4d0d..339d55f 100644 --- a/core/sql/sqlcomp/DefaultConstants.h +++ b/core/sql/sqlcomp/DefaultConstants.h @@ -3306,6 +3306,9 @@ enum DefaultConstants SUPPRESS_CHAR_LIMIT_CHECK, BMO_MEMORY_ESTIMATE_OUTLIER_FACTOR, + + // Use the earlier implementation of HdfsScan via libhdfs + USE_LIBHDFS_SCAN, // This enum constant must be the LAST one in the list; it's a count, // not an Attribute (it's not IN DefaultDefaults; it's the SIZE of it)! __NUM_DEFAULT_ATTRIBUTES http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/sqlcomp/nadefaults.cpp ---------------------------------------------------------------------- diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp index a75c20b..28148cd 100644 --- a/core/sql/sqlcomp/nadefaults.cpp +++ b/core/sql/sqlcomp/nadefaults.cpp @@ -3033,6 +3033,8 @@ XDDkwd__(SUBQUERY_UNNESTING, "ON"), // Use large queues on RHS of Flow/Nested Join when appropriate DDkwd__(USE_LARGE_QUEUES, "ON"), + DDkwd__(USE_LIBHDFS_SCAN, "ON"), + DDkwd__(USE_MAINTAIN_CONTROL_TABLE, "OFF"), DDkwd__(USE_OLD_DT_CONSTRUCTOR, "OFF"), http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java index 8d2052f..1af2c49 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java +++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java @@ -54,6 +54,7 @@ public class HDFSClient private static FileSystem defaultFs_ = null; private FileSystem fs_ = null; private int bufNo_; + private int rangeNo_; private FSDataInputStream fsdis_; private OutputStream outStream_; private String filename_; @@ -66,7 +67,7 @@ public class HDFSClient private int blockSize_; private int bytesRead_; private Future future_ = null; - + private int isEOF_ = 0; static { String confFile = System.getProperty("trafodion.log4j.configFile"); System.setProperty("trafodion.root", System.getenv("TRAF_HOME")); @@ -111,9 +112,10 @@ public class HDFSClient { } - public HDFSClient(int bufNo, String filename, ByteBuffer buffer, long position, int length) throws IOException + public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer buffer, long position, int length) throws IOException { bufNo_ = bufNo; + rangeNo_ = rangeNo; filename_ = filename; Path filepath = new Path(filename_); fs_ = FileSystem.get(filepath.toUri(),config_); @@ -164,13 +166,27 @@ public class HDFSClient return bytesRead; } + public int getRangeNo() + { + return rangeNo_; + } + + public int isEOF() + { + return isEOF_; + } + public int trafHdfsReadBuffer() throws IOException, InterruptedException, ExecutionException { int bytesRead; int totalBytesRead = 0; while (true) { bytesRead = trafHdfsRead(); - if (bytesRead == -1 || bytesRead == 0) + if (bytesRead == -1) { + isEOF_ = 1; + return totalBytesRead; + } + if (bytesRead == 0) return totalBytesRead; totalBytesRead += bytesRead; if (totalBytesRead == bufLen_) http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java index bf81ab0..9fb145e 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java +++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java @@ -88,7 +88,11 @@ public class HdfsScan System.setProperty("trafodion.root", System.getenv("TRAF_HOME")); } - HdfsScan(ByteBuffer buf1, ByteBuffer buf2, String filename[], long pos[], long len[]) throws IOException + public HdfsScan() + { + } + + public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String filename[], long pos[], long len[]) throws IOException { buf_ = new ByteBuffer[2]; bufLen_ = new int[2]; @@ -119,13 +123,14 @@ public class HdfsScan public void hdfsScanRange(int bufNo) throws IOException { - System.out.println (" CurrentRange " + currRange_ + " LenRemain " + lenRemain_ + " BufNo " + bufNo); + if (logger_.isDebugEnabled()) + logger_.debug(" CurrentRange " + currRange_ + " LenRemain " + lenRemain_ + " BufNo " + bufNo); int readLength; if (lenRemain_ > bufLen_[bufNo]) readLength = bufLen_[bufNo]; else readLength = (int)lenRemain_; - hdfsClient_[bufNo] = new HDFSClient(bufNo, hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currPos_, readLength); + hdfsClient_[bufNo] = new HDFSClient(bufNo, currRange_, hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currPos_, readLength); lenRemain_ -= readLength; currPos_ += readLength; if (lenRemain_ == 0) { @@ -144,29 +149,44 @@ public class HdfsScan int[] retArray; int byteCompleted; int bufNo; - + int rangeNo; + int isEOF; + + if (hdfsScanRanges_ == null) + throw new IOException("Scan ranges are not yet set"); if (scanCompleted_) return null; - retArray = new int[2]; + retArray = new int[4]; switch (lastBufCompleted_) { case -1: case 1: byteCompleted = hdfsClient_[0].trafHdfsReadBuffer(); bufNo = 0; + rangeNo = hdfsClient_[0].getRangeNo(); + isEOF = hdfsClient_[0].isEOF(); break; case 0: byteCompleted = hdfsClient_[1].trafHdfsReadBuffer(); bufNo = 1; + rangeNo = hdfsClient_[1].getRangeNo(); + isEOF = hdfsClient_[1].isEOF(); break; default: bufNo = -1; byteCompleted = -1; + rangeNo = -1; + isEOF = 0; } lastBufCompleted_ = bufNo; retArray[0] = byteCompleted; retArray[1] = bufNo; - System.out.println (" Buffer No " + retArray[1] + " Bytes Read " + retArray[0]); + retArray[2] = rangeNo; + retArray[3] = isEOF; + if (logger_.isDebugEnabled()) + logger_.debug(" Range No " + retArray[2] + " Buffer No " + retArray[1] + " Bytes Read " + retArray[0] + " isEOF " + retArray[3]); lastBufCompleted_ = bufNo; + if ((isEOF == 1) && (currRange_ == (hdfsScanRanges_.length-1))) + lastScanRangeScheduled_ = true; if (lastScanRangeScheduled_) { scanCompleted_ = true; return retArray; @@ -233,7 +253,8 @@ public class HdfsScan } } long time1 = System.currentTimeMillis(); - HdfsScan hdfsScan = new HdfsScan(buf1, buf2, fileName, pos, len); + HdfsScan hdfsScan = new HdfsScan(); + hdfsScan.setScanRanges(buf1, buf2, fileName, pos, len); int[] retArray; int bytesCompleted; while (true) {
