[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text format hive tables
Part-6 changes Removed the dependency of libhdfs while assigning ranges for hdfs scan at runtime and for sample file creation during bulk load Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/a99ee11e Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/a99ee11e Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/a99ee11e Branch: refs/heads/master Commit: a99ee11ed3617c0d3e623082a7984fb08b371e96 Parents: a187b03 Author: selvaganesang <[email protected]> Authored: Thu Feb 15 21:39:41 2018 +0000 Committer: selvaganesang <[email protected]> Committed: Thu Feb 15 21:39:41 2018 +0000 ---------------------------------------------------------------------- core/sql/comexe/ComTdbHdfsScan.h | 2 - core/sql/executor/ExExeUtilCli.cpp | 1 - core/sql/executor/ExExeUtilMisc.cpp | 1 - core/sql/executor/ExHbaseAccess.cpp | 17 +- core/sql/executor/ExHbaseAccess.h | 18 +- core/sql/executor/ExHbaseIUD.cpp | 44 +++-- core/sql/executor/ExHdfsScan.cpp | 47 ++--- core/sql/executor/ExHdfsScan.h | 6 +- core/sql/executor/HdfsClient_JNI.cpp | 176 +++++++++++++++++-- core/sql/executor/HdfsClient_JNI.h | 44 ++++- core/sql/executor/JavaObjectInterface.cpp | 10 +- core/sql/sqlcomp/CmpSeabaseDDLcommon.cpp | 1 - .../main/java/org/trafodion/sql/HDFSClient.java | 83 ++++++++- 13 files changed, 360 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/a99ee11e/core/sql/comexe/ComTdbHdfsScan.h ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComTdbHdfsScan.h b/core/sql/comexe/ComTdbHdfsScan.h index ff692c9..86534be 100755 --- a/core/sql/comexe/ComTdbHdfsScan.h +++ b/core/sql/comexe/ComTdbHdfsScan.h @@ -24,7 +24,6 @@ #define COM_HDFS_SCAN_H #include "ComTdb.h" -//#include "hdfs.h" #include "ExpLOBinterface.h" #include "ComQueue.h" @@ -108,7 +107,6 @@ class ComTdbHdfsScan : public ComTdb UInt32 flags_; // 96 - 99 - // hadoop port num. An unsigned short in hdfs.h, subject to change. UInt16 port_; // 100 - 101 UInt16 convertSkipListSize_; // 102 - 103 http://git-wip-us.apache.org/repos/asf/trafodion/blob/a99ee11e/core/sql/executor/ExExeUtilCli.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExExeUtilCli.cpp b/core/sql/executor/ExExeUtilCli.cpp index fdfef44..3effdb6 100644 --- a/core/sql/executor/ExExeUtilCli.cpp +++ b/core/sql/executor/ExExeUtilCli.cpp @@ -42,7 +42,6 @@ #include "sql_id.h" #include "ComSqlId.h" #include "ExExeUtilCli.h" -#include "hdfs.h" OutputInfo::OutputInfo(Lng32 numEntries) : numEntries_(numEntries) { http://git-wip-us.apache.org/repos/asf/trafodion/blob/a99ee11e/core/sql/executor/ExExeUtilMisc.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExExeUtilMisc.cpp b/core/sql/executor/ExExeUtilMisc.cpp index 2ed95fb..b7322ff 100644 --- a/core/sql/executor/ExExeUtilMisc.cpp +++ b/core/sql/executor/ExExeUtilMisc.cpp @@ -53,7 +53,6 @@ #include "ComRtUtils.h" #include "ExStats.h" #include "ComSmallDefs.h" -//#include "hdfs.h" //will replace with LOB interface #include <unistd.h> ////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/trafodion/blob/a99ee11e/core/sql/executor/ExHbaseAccess.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHbaseAccess.cpp b/core/sql/executor/ExHbaseAccess.cpp index 2247b9a..2182f9a 100644 --- a/core/sql/executor/ExHbaseAccess.cpp +++ b/core/sql/executor/ExHbaseAccess.cpp @@ -39,7 +39,6 @@ #include "cli_stdh.h" #include "exp_function.h" #include "jni.h" -#include "hdfs.h" #include <random> #include "HdfsClient_JNI.h" @@ -242,7 +241,7 @@ ExHbaseAccessTcb::ExHbaseAccessTcb( , colValVecSize_(0) , colValEntry_(0) , loggingErrorDiags_(NULL) - , hdfsClient_(NULL) + , logFileHdfsClient_(NULL) , loggingFileCreated_(FALSE) , loggingFileName_(NULL) { @@ -500,8 +499,8 @@ void ExHbaseAccessTcb::freeResources() NADELETEBASIC(directRowBuffer_, getHeap()); if (colVal_.val != NULL) NADELETEBASIC(colVal_.val, getHeap()); - if (hdfsClient_ != NULL) - NADELETE(hdfsClient_, HdfsClient, getHeap()); + if (logFileHdfsClient_ != NULL) + NADELETE(logFileHdfsClient_, HdfsClient, getHeap()); if (loggingFileName_ != NULL) NADELETEBASIC(loggingFileName_, getHeap()); } @@ -2992,7 +2991,7 @@ short ExHbaseAccessTcb::createDirectRowBuffer( UInt16 tuppIndex, { // Overwrite trailing delimiter with newline. hiveBuff[hiveBuffInx-1] = '\n'; - hdfsWrite(getHdfs(), getHdfsSampleFile(), hiveBuff, hiveBuffInx); + sampleFileHdfsClient()->hdfsWrite(hiveBuff, hiveBuffInx); } return 0; } @@ -3264,16 +3263,16 @@ void ExHbaseAccessTcb::handleException(NAHeap *heap, return; if (!loggingFileCreated_) { - hdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), hdfsClientRetcode); + logFileHdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), hdfsClientRetcode); if (hdfsClientRetcode == HDFS_CLIENT_OK) - hdfsClientRetcode = hdfsClient_->hdfsCreate(loggingFileName_, FALSE); + hdfsClientRetcode = logFileHdfsClient_->hdfsCreate(loggingFileName_, FALSE); if (hdfsClientRetcode == HDFS_CLIENT_OK) loggingFileCreated_ = TRUE; else goto logErrorReturn; } - hdfsClientRetcode = hdfsClient_->hdfsWrite(logErrorRow, logErrorRowLen); + hdfsClientRetcode = logFileHdfsClient_->hdfsWrite(logErrorRow, logErrorRowLen); if (hdfsClientRetcode != HDFS_CLIENT_OK) goto logErrorReturn; if (errorCond != NULL) { @@ -3289,7 +3288,7 @@ void ExHbaseAccessTcb::handleException(NAHeap *heap, errorMsg = (char *)"[UNKNOWN EXCEPTION]\n"; errorMsgLen = strlen(errorMsg); } - hdfsClientRetcode = hdfsClient_->hdfsWrite(errorMsg, errorMsgLen); + hdfsClientRetcode = logFileHdfsClient_->hdfsWrite(errorMsg, errorMsgLen); logErrorReturn: if (hdfsClientRetcode != HDFS_CLIENT_OK) { loggingErrorDiags_ = ComDiagsArea::allocate(heap); http://git-wip-us.apache.org/repos/asf/trafodion/blob/a99ee11e/core/sql/executor/ExHbaseAccess.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHbaseAccess.h b/core/sql/executor/ExHbaseAccess.h index 863b5b5..7be1551 100644 --- a/core/sql/executor/ExHbaseAccess.h +++ b/core/sql/executor/ExHbaseAccess.h @@ -36,7 +36,6 @@ #include "key_range.h" #include "key_single_subset.h" #include "ex_mdam.h" -#include "hdfs.h" // ----------------------------------------------------------------------- // Classes defined in this file @@ -501,14 +500,13 @@ protected: NABoolean asyncOperation_; Int32 asyncOperationTimeout_; ComDiagsArea *loggingErrorDiags_; - HdfsClient *hdfsClient_; + HdfsClient *logFileHdfsClient_; char *loggingFileName_; NABoolean loggingFileCreated_ ; // Redefined and used by ExHbaseAccessBulkLoadPrepSQTcb. - virtual hdfsFS getHdfs() const { return NULL; } - virtual hdfsFile getHdfsSampleFile() const { return NULL; } + virtual HdfsClient *sampleFileHdfsClient() const { return NULL; } }; class ExHbaseTaskTcb : public ExGod @@ -921,14 +919,9 @@ class ExHbaseAccessBulkLoadPrepSQTcb: public ExHbaseAccessUpsertVsbbSQTcb virtual ExWorkProcRetcode work(); protected: - virtual hdfsFS getHdfs() const + virtual HdfsClient *sampleFileHdfsClient() const { - return hdfs_; - } - - virtual hdfsFile getHdfsSampleFile() const - { - return hdfsSampleFile_; + return sampleFileHdfsClient_; } private: @@ -949,8 +942,7 @@ class ExHbaseAccessBulkLoadPrepSQTcb: public ExHbaseAccessUpsertVsbbSQTcb // HDFS file system and output file ptrs used for ustat sample table. - hdfsFS hdfs_; - hdfsFile hdfsSampleFile_; + HdfsClient *sampleFileHdfsClient_; }; // UMD SQ: UpdMergeDel on Trafodion table class ExHbaseUMDtrafSubsetTaskTcb : public ExHbaseTaskTcb http://git-wip-us.apache.org/repos/asf/trafodion/blob/a99ee11e/core/sql/executor/ExHbaseIUD.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHbaseIUD.cpp b/core/sql/executor/ExHbaseIUD.cpp index e8896b2..2663b24 100644 --- a/core/sql/executor/ExHbaseIUD.cpp +++ b/core/sql/executor/ExHbaseIUD.cpp @@ -1141,8 +1141,6 @@ ExHbaseAccessBulkLoadPrepSQTcb::ExHbaseAccessBulkLoadPrepSQTcb( ex_globals * glob ) : ExHbaseAccessUpsertVsbbSQTcb( hbaseAccessTdb, glob), prevRowId_ (NULL), - hdfs_(NULL), - hdfsSampleFile_(NULL), lastErrorCnd_(NULL) { hFileParamsInitialized_ = false; @@ -1163,17 +1161,8 @@ ExHbaseAccessBulkLoadPrepSQTcb::ExHbaseAccessBulkLoadPrepSQTcb( ExHbaseAccessBulkLoadPrepSQTcb::~ExHbaseAccessBulkLoadPrepSQTcb() { - // Flush and close sample file if used - if (hdfs_) - { - if (hdfsSampleFile_) - { - hdfsFlush(hdfs_, hdfsSampleFile_); - hdfsCloseFile(hdfs_, hdfsSampleFile_); - } - - } - + if (sampleFileHdfsClient_ != NULL) + NADELETE(sampleFileHdfsClient_, HdfsClient, getHeap()); } // Given the type information available via the argument, return the name of @@ -1394,7 +1383,7 @@ ExWorkProcRetcode ExHbaseAccessBulkLoadPrepSQTcb::work() if (!hFileParamsInitialized_) { - importLocation_= std::string(((ExHbaseAccessTdb&)hbaseAccessTdb()).getLoadPrepLocation()) + + importLocation_= std::string(((ExHbaseAccessTdb&)hbaseAccessTdb()).getLoadPrepLocation()) + ((ExHbaseAccessTdb&)hbaseAccessTdb()).getTableName() ; familyLocation_ = std::string(importLocation_ + "/#1"); Lng32 fileNum = getGlobals()->castToExExeStmtGlobals()->getMyInstanceNumber(); @@ -1424,15 +1413,30 @@ ExWorkProcRetcode ExHbaseAccessBulkLoadPrepSQTcb::work() // Set up HDFS file for sample table. ContextCli *currContext = getGlobals()->castToExExeStmtGlobals()->getCliGlobals()->currContext(); - hdfs_ = currContext->getHdfsServerConnection((char*)"default",0); Text samplePath = std::string(((ExHbaseAccessTdb&)hbaseAccessTdb()).getSampleLocation()) + ((ExHbaseAccessTdb&)hbaseAccessTdb()).getTableName() ; char filePart[10]; sprintf(filePart, "/%d", fileNum); + HDFS_Client_RetCode hdfsClientRetcode; samplePath.append(filePart); - hdfsSampleFile_ = hdfsOpenFile(hdfs_, samplePath.data(), O_WRONLY|O_CREAT, 0, 0, 0); + if (sampleFileHdfsClient_ == NULL) + sampleFileHdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), hdfsClientRetcode); + if (hdfsClientRetcode == HDFS_CLIENT_OK) { + hdfsClientRetcode = sampleFileHdfsClient_->hdfsOpen(samplePath.data(), FALSE); + if (hdfsClientRetcode != HDFS_CLIENT_OK) { + NADELETE(sampleFileHdfsClient_, HdfsClient, getHeap()); + sampleFileHdfsClient_ = NULL; + } + } + if (hdfsClientRetcode != HDFS_CLIENT_OK) { + ComDiagsArea * diagsArea = NULL; + ExRaiseSqlError(getHeap(), &diagsArea, + (ExeErrorCode)(8110)); + pentry_down->setDiagsArea(diagsArea); + step_ = HANDLE_ERROR; + break; + } } - posVec_.clear(); hbaseAccessTdb().listOfUpdatedColNames()->position(); while (NOT hbaseAccessTdb().listOfUpdatedColNames()->atEnd()) @@ -1763,8 +1767,10 @@ ExWorkProcRetcode ExHbaseAccessBulkLoadPrepSQTcb::work() if (eodSeen) { ehi_->closeHFile(table_); - if (hdfsClient_ != NULL) - hdfsClient_->hdfsClose(); + if (logFileHdfsClient_ != NULL) + logFileHdfsClient_->hdfsClose(); + if (sampleFileHdfsClient_ != NULL) + sampleFileHdfsClient_->hdfsClose(); hFileParamsInitialized_ = false; retcode = ehi_->close(); } http://git-wip-us.apache.org/repos/asf/trafodion/blob/a99ee11e/core/sql/executor/ExHdfsScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp index cd95899..c36270e 100644 --- a/core/sql/executor/ExHdfsScan.cpp +++ b/core/sql/executor/ExHdfsScan.cpp @@ -119,6 +119,7 @@ ExHdfsScanTcb::ExHdfsScanTcb( , dataModCheckDone_(FALSE) , loggingErrorDiags_(NULL) , loggingFileName_(NULL) + , logFileHdfsClient_(NULL) , hdfsClient_(NULL) , hdfsScan_(NULL) , hdfsStats_(NULL) @@ -226,7 +227,10 @@ ExHdfsScanTcb::ExHdfsScanTcb( ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(), (char*)"", //Later replace with server cqd (char*)""); - + ex_assert(ehi_ != NULL, "Internal error: ehi_ is null in ExHdfsScan"); + HDFS_Client_RetCode hdfsClientRetcode; + hdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), hdfsClientRetcode); + ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: HdfsClient::newInstance returned an error"); // Populate the hdfsInfo list into an array to gain o(1) lookup access Queue* hdfsInfoList = hdfsScanTdb.getHdfsFileInfoList(); if ( hdfsInfoList && hdfsInfoList->numEntries() > 0 ) @@ -308,6 +312,8 @@ void ExHdfsScanTcb::freeResources() } if (hdfsClient_ != NULL) NADELETE(hdfsClient_, HdfsClient, getHeap()); + if (logFileHdfsClient_ != NULL) + NADELETE(logFileHdfsClient_, HdfsClient, getHeap()); if (hdfsScan_ != NULL) NADELETE(hdfsScan_, HdfsScan, getHeap()); } @@ -408,8 +414,6 @@ ExWorkProcRetcode ExHdfsScanTcb::work() Lng32 openType = 0; int changedLen = 0; ContextCli *currContext = getGlobals()->castToExExeStmtGlobals()->getCliGlobals()->currContext(); - hdfsFS hdfs = currContext->getHdfsServerConnection(hdfsScanTdb().hostName_,hdfsScanTdb().port_); - hdfsFileInfo *dirInfo = NULL; Int32 hdfsErrorDetail = 0;//this is errno returned form underlying hdfsOpenFile call. HDFS_Scan_RetCode hdfsScanRetCode; @@ -1075,8 +1079,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work() { if (useLibhdfsScan_) step_ = REPOS_HDFS_DATA; - else - step_ = COPY_TAIL_TO_HEAD; + else { + if (retArray_[IS_EOF]) + step_ = TRAF_HDFS_READ; + else + step_ = COPY_TAIL_TO_HEAD; + } if (!exception_) break; } @@ -1299,7 +1307,7 @@ ExWorkProcRetcode ExHdfsScanTcb::work() if ((pentry_down->downState.request == ex_queue::GET_N) && (pentry_down->downState.requestValue == matches_)) { if (useLibhdfsScan_) - step_ = CLOSE_HDFS_CURSOR; + step_ = CLOSE_FILE; else step_ = DONE; } @@ -1668,8 +1676,8 @@ ExWorkProcRetcode ExHdfsScanTcb::work() { if (qparent_.up->isFull()) return WORK_OK; - if (hdfsClient_ != NULL) - retcode = hdfsClient_->hdfsClose(); + if (logFileHdfsClient_ != NULL) + retcode = logFileHdfsClient_->hdfsClose(); ex_queue_entry *up_entry = qparent_.up->getTailEntry(); up_entry->copyAtp(pentry_down); up_entry->upState.parentIndex = @@ -1693,7 +1701,6 @@ ExWorkProcRetcode ExHdfsScanTcb::work() qparent_.down->removeHead(); step_ = NOT_STARTED; - dirInfo = hdfsGetPathInfo(hdfs, "/"); break; } @@ -1900,19 +1907,19 @@ void ExHdfsScanTcb::computeRangesAtRuntime() Int64 firstFileStartingOffset = 0; Int64 lastFileBytesToRead = -1; Int32 numParallelInstances = MAXOF(getGlobals()->getNumOfInstances(),1); - hdfsFS fs = ((GetCliGlobals()->currContext())->getHdfsServerConnection( - hdfsScanTdb().hostName_, - hdfsScanTdb().port_)); - hdfsFileInfo *fileInfos = hdfsListDirectory(fs, - hdfsScanTdb().hdfsRootDir_, - &numFiles); + + HDFS_FileInfo *fileInfos; + HDFS_Client_RetCode hdfsClientRetcode; + + hdfsClientRetcode = hdfsClient_->hdfsListDirectory(hdfsScanTdb().hdfsRootDir_, &fileInfos, &numFiles); + ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error:hdfsClient->hdfsListDirectory returned an error") deallocateRuntimeRanges(); // in a first round, count the total number of bytes for (int f=0; f<numFiles; f++) { - ex_assert(fileInfos[f].mKind == kObjectKindFile, + ex_assert(fileInfos[f].mKind == HDFS_FILE_KIND, "subdirectories not supported with runtime HDFS ranges"); totalSize += (Int64) fileInfos[f].mSize; } @@ -2127,15 +2134,15 @@ void ExHdfsScanTcb::handleException(NAHeap *heap, return; if (!loggingFileCreated_) { - hdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), hdfsClientRetcode); + logFileHdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), hdfsClientRetcode); if (hdfsClientRetcode == HDFS_CLIENT_OK) - hdfsClientRetcode = hdfsClient_->hdfsCreate(loggingFileName_, FALSE); + hdfsClientRetcode = logFileHdfsClient_->hdfsCreate(loggingFileName_, FALSE); if (hdfsClientRetcode == HDFS_CLIENT_OK) loggingFileCreated_ = TRUE; else goto logErrorReturn; } - hdfsClientRetcode = hdfsClient_->hdfsWrite(logErrorRow, logErrorRowLen); + hdfsClientRetcode = logFileHdfsClient_->hdfsWrite(logErrorRow, logErrorRowLen); if (hdfsClientRetcode != HDFS_CLIENT_OK) goto logErrorReturn; if (errorCond != NULL) { @@ -2151,7 +2158,7 @@ void ExHdfsScanTcb::handleException(NAHeap *heap, errorMsg = (char *)"[UNKNOWN EXCEPTION]\n"; errorMsgLen = strlen(errorMsg); } - hdfsClientRetcode = hdfsClient_->hdfsWrite(errorMsg, errorMsgLen); + hdfsClientRetcode = logFileHdfsClient_->hdfsWrite(errorMsg, errorMsgLen); logErrorReturn: if (hdfsClientRetcode != HDFS_CLIENT_OK) { loggingErrorDiags_ = ComDiagsArea::allocate(heap); http://git-wip-us.apache.org/repos/asf/trafodion/blob/a99ee11e/core/sql/executor/ExHdfsScan.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHdfsScan.h b/core/sql/executor/ExHdfsScan.h index 2570a58..f4ad7e1 100644 --- a/core/sql/executor/ExHdfsScan.h +++ b/core/sql/executor/ExHdfsScan.h @@ -31,9 +31,6 @@ #include "ExStats.h" #include "sql_buffer.h" #include "ex_queue.h" - -#include "hdfs.h" - #include <time.h> #include "ExHbaseAccess.h" #include "ExpHbaseInterface.h" @@ -130,7 +127,7 @@ private: another Java thread. 6. Native layer after processing all the rows in one ByteBuffer, moves the last incomplete row to head room of the other ByteBuffer. Then it requests to check if the read is complete. The native layer processes the buffer starting - from the copied incomplete row. + from the copied partial row. */ class ExHdfsScanTcb : public ex_tcb @@ -339,6 +336,7 @@ protected: // this array is populated from the info list stored as Queue. HdfsFileInfoArray hdfsFileInfoListAsArray_; + HdfsClient *logFileHdfsClient_; HdfsClient *hdfsClient_; HdfsScan *hdfsScan_; NABoolean useLibhdfsScan_; http://git-wip-us.apache.org/repos/asf/trafodion/blob/a99ee11e/core/sql/executor/HdfsClient_JNI.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/HdfsClient_JNI.cpp b/core/sql/executor/HdfsClient_JNI.cpp index 8f2845a..2fab571 100644 --- a/core/sql/executor/HdfsClient_JNI.cpp +++ b/core/sql/executor/HdfsClient_JNI.cpp @@ -258,6 +258,8 @@ static const char* const hdfsClientErrorEnumStr[] = { "JNI NewStringUTF() in hdfsCreate()." ,"Java exception in hdfsCreate()." + "JNI NewStringUTF() in hdfsOpen()." + ,"Java exception in hdfsOpen()." ,"JNI NewStringUTF() in hdfsWrite()." ,"Java exception in hdfsWrite()." ,"Java exception in hdfsClose()." @@ -269,11 +271,32 @@ static const char* const hdfsClientErrorEnumStr[] = ,"Java exception in hdfsExists()." ,"JNI NewStringUTF() in hdfsDeletePath()." ,"Java exception in hdfsDeletePath()." + ,"Error in setHdfsFileInfo()." + ,"Error in hdfsListDirectory()." + ,"Java exception in hdfsListDirectory()." }; ////////////////////////////////////////////////////////////////////////////// // ////////////////////////////////////////////////////////////////////////////// +HdfsClient::~HdfsClient() +{ + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::~HdfsClient() called."); + deleteHdfsFileInfo(); +} + +void HdfsClient::deleteHdfsFileInfo() +{ + for (int i = 0; i < numFiles_ ; i ++) { + NADELETEBASIC(hdfsFileInfo_[i].mName, getHeap()); + NADELETEBASIC(hdfsFileInfo_[i].mOwner, getHeap()); + NADELETEBASIC(hdfsFileInfo_[i].mGroup, getHeap()); + } + NADELETEBASIC(hdfsFileInfo_, getHeap()); + numFiles_ = 0; + hdfsFileInfo_ = NULL; +} + HdfsClient *HdfsClient::newInstance(NAHeap *heap, HDFS_Client_RetCode &retCode) { QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::newInstance() called."); @@ -313,6 +336,8 @@ HDFS_Client_RetCode HdfsClient::init() JavaMethods_[JM_CTOR ].jm_signature = "()V"; JavaMethods_[JM_HDFS_CREATE ].jm_name = "hdfsCreate"; JavaMethods_[JM_HDFS_CREATE ].jm_signature = "(Ljava/lang/String;Z)Z"; + JavaMethods_[JM_HDFS_OPEN ].jm_name = "hdfsOpen"; + JavaMethods_[JM_HDFS_OPEN ].jm_signature = "(Ljava/lang/String;Z)Z"; JavaMethods_[JM_HDFS_WRITE ].jm_name = "hdfsWrite"; JavaMethods_[JM_HDFS_WRITE ].jm_signature = "([BJ)Z"; JavaMethods_[JM_HDFS_CLOSE ].jm_name = "hdfsClose"; @@ -325,6 +350,8 @@ HDFS_Client_RetCode HdfsClient::init() JavaMethods_[JM_HDFS_EXISTS].jm_signature = "(Ljava/lang/String;)Z"; JavaMethods_[JM_HDFS_DELETE_PATH].jm_name = "hdfsDeletePath"; JavaMethods_[JM_HDFS_DELETE_PATH].jm_signature = "(Ljava/lang/String;)Z"; + JavaMethods_[JM_HDFS_LIST_DIRECTORY].jm_name = "hdfsListDirectory"; + JavaMethods_[JM_HDFS_LIST_DIRECTORY].jm_signature = "(Ljava/lang/String;J)I"; rc = (HDFS_Client_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); if (rc == HDFS_CLIENT_OK) javaMethodsInitialized_ = TRUE; @@ -383,9 +410,46 @@ HDFS_Client_RetCode HdfsClient::hdfsCreate(const char* path, NABoolean compress) return HDFS_CLIENT_OK; } -////////////////////////////////////////////////////////////////////////////// -// -////////////////////////////////////////////////////////////////////////////// +HDFS_Client_RetCode HdfsClient::hdfsOpen(const char* path, NABoolean compress) +{ + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsOpen(%s) called.", path); + + if (initJNIEnv() != JOI_OK) + return HDFS_CLIENT_ERROR_HDFS_OPEN_PARAM; + + jstring js_path = jenv_->NewStringUTF(path); + if (js_path == NULL) { + GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_OPEN_PARAM)); + jenv_->PopLocalFrame(NULL); + return HDFS_CLIENT_ERROR_HDFS_OPEN_PARAM; + } + + jboolean j_compress = compress; + + tsRecentJMFromJNI = JavaMethods_[JM_HDFS_OPEN].jm_full_name; + jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_OPEN].methodID, js_path, j_compress); + + if (jenv_->ExceptionCheck()) + { + getExceptionDetails(); + logError(CAT_SQL_HDFS, __FILE__, __LINE__); + logError(CAT_SQL_HDFS, "HdfsClient::hdfsOpen()", getLastError()); + jenv_->PopLocalFrame(NULL); + return HDFS_CLIENT_ERROR_HDFS_OPEN_EXCEPTION; + } + + if (jresult == false) + { + logError(CAT_SQL_HDFS, "HdfsClient::hdfsOpen()", getLastError()); + jenv_->PopLocalFrame(NULL); + return HDFS_CLIENT_ERROR_HDFS_OPEN_PARAM; + } + + jenv_->PopLocalFrame(NULL); + return HDFS_CLIENT_OK; +} + + HDFS_Client_RetCode HdfsClient::hdfsWrite(const char* data, Int64 len) { QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsWrite(%ld) called.", len); @@ -467,7 +531,6 @@ HDFS_Client_RetCode HdfsClient::hdfsCleanUnloadPath( const NAString& uldPath) uldPath.data()); if (initJNIEnv() != JOI_OK) return HDFS_CLIENT_ERROR_HDFS_CLEANUP_PARAM; - jstring js_UldPath = jenv_->NewStringUTF(uldPath.data()); if (js_UldPath == NULL) { GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_CLEANUP_PARAM)); @@ -476,7 +539,7 @@ HDFS_Client_RetCode HdfsClient::hdfsCleanUnloadPath( const NAString& uldPath) } tsRecentJMFromJNI = JavaMethods_[JM_HDFS_CLEAN_UNLOAD_PATH].jm_full_name; - jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_CLEAN_UNLOAD_PATH].methodID, js_UldPath); + jboolean jresult = jenv_->CallStaticBooleanMethod(javaClass_, JavaMethods_[JM_HDFS_CLEAN_UNLOAD_PATH].methodID, js_UldPath); if (jenv_->ExceptionCheck()) { @@ -516,7 +579,7 @@ HDFS_Client_RetCode HdfsClient::hdfsMergeFiles( const NAString& srcPath, tsRecentJMFromJNI = JavaMethods_[JM_HDFS_MERGE_FILES].jm_full_name; - jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_MERGE_FILES].methodID, js_SrcPath, js_DstPath); + jboolean jresult = jenv_->CallStaticBooleanMethod(javaClass_, JavaMethods_[JM_HDFS_MERGE_FILES].methodID, js_SrcPath, js_DstPath); if (jenv_->ExceptionCheck()) { @@ -554,7 +617,7 @@ HDFS_Client_RetCode HdfsClient::hdfsDeletePath( const NAString& delPath) tsRecentJMFromJNI = JavaMethods_[JM_HDFS_DELETE_PATH].jm_full_name; - jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_DELETE_PATH].methodID, js_delPath); + jboolean jresult = jenv_->CallStaticBooleanMethod(javaClass_, JavaMethods_[JM_HDFS_DELETE_PATH].methodID, js_delPath); if (jenv_->ExceptionCheck()) { @@ -576,6 +639,37 @@ HDFS_Client_RetCode HdfsClient::hdfsDeletePath( const NAString& delPath) return HDFS_CLIENT_OK; } +HDFS_Client_RetCode HdfsClient::hdfsListDirectory(const char *pathStr, HDFS_FileInfo **hdfsFileInfo, int *numFiles) +{ + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsListDirectory(%s) called.", pathStr); + + if (initJNIEnv() != JOI_OK) + return HDFS_CLIENT_ERROR_HDFS_LIST_DIR_PARAM; + + jstring js_pathStr = jenv_->NewStringUTF(pathStr); + if (js_pathStr == NULL) { + jenv_->PopLocalFrame(NULL); + return HDFS_CLIENT_ERROR_HDFS_LIST_DIR_PARAM; + } + jlong jniObj = (long)this; + tsRecentJMFromJNI = JavaMethods_[JM_HDFS_LIST_DIRECTORY].jm_full_name; + + jint retNumFiles = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_LIST_DIRECTORY].methodID, + js_pathStr, jniObj); + if (jenv_->ExceptionCheck()) + { + getExceptionDetails(); + logError(CAT_SQL_HDFS, __FILE__, __LINE__); + logError(CAT_SQL_HDFS, "HdfsClient::hdfsListDirectory()", getLastError()); + jenv_->PopLocalFrame(NULL); + return HDFS_CLIENT_ERROR_HDFS_LIST_DIR_EXCEPTION; + } + *numFiles = retNumFiles; + *hdfsFileInfo = hdfsFileInfo_; + jenv_->PopLocalFrame(NULL); + return HDFS_CLIENT_OK; +} + HDFS_Client_RetCode HdfsClient::hdfsExists( const NAString& uldPath, NABoolean & exist) { QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsExists(%s) called.", @@ -591,10 +685,8 @@ HDFS_Client_RetCode HdfsClient::hdfsExists( const NAString& uldPath, NABoolean & } tsRecentJMFromJNI = JavaMethods_[JM_HDFS_EXISTS].jm_full_name; - jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_EXISTS].methodID, js_UldPath); - + jboolean jresult = jenv_->CallStaticBooleanMethod(javaClass_, JavaMethods_[JM_HDFS_EXISTS].methodID, js_UldPath); exist = jresult; - if (jenv_->ExceptionCheck()) { getExceptionDetails(); @@ -603,7 +695,69 @@ HDFS_Client_RetCode HdfsClient::hdfsExists( const NAString& uldPath, NABoolean & jenv_->PopLocalFrame(NULL); return HDFS_CLIENT_ERROR_HDFS_EXISTS_EXCEPTION; } - jenv_->PopLocalFrame(NULL); return HDFS_CLIENT_OK; } + +HDFS_Client_RetCode HdfsClient::setHdfsFileInfo(JNIEnv *jenv, jint numFiles, jint fileNo, jboolean isDir, + jstring filename, jlong modTime, jlong len, jshort numReplicas, jlong blockSize, + jstring owner, jstring group, jshort permissions, jlong accessTime) +{ + HDFS_FileInfo *hdfsFileInfo; + + if (fileNo == 0 && hdfsFileInfo_ != NULL) + deleteHdfsFileInfo(); + + if (hdfsFileInfo_ == NULL) { + hdfsFileInfo_ = new (getHeap()) HDFS_FileInfo[numFiles]; + numFiles_ = numFiles; + } + + if (fileNo >= numFiles_) + return HDFS_CLIENT_ERROR_SET_HDFSFILEINFO; + hdfsFileInfo = &hdfsFileInfo_[fileNo]; + if (isDir) + hdfsFileInfo->mKind = HDFS_DIRECTORY_KIND; + else + hdfsFileInfo->mKind = HDFS_FILE_KIND; + hdfsFileInfo->mLastMod = modTime; + hdfsFileInfo->mSize = len; + hdfsFileInfo->mReplication = numReplicas; + hdfsFileInfo->mBlockSize = blockSize; + hdfsFileInfo->mPermissions = permissions; + hdfsFileInfo->mLastAccess = accessTime; + jint tempLen = jenv->GetStringUTFLength(filename); + hdfsFileInfo->mName = new (getHeap()) char[tempLen+1]; + strncpy(hdfsFileInfo->mName, jenv->GetStringUTFChars(filename, NULL), tempLen); + hdfsFileInfo->mName[tempLen] = '\0'; + tempLen = jenv->GetStringUTFLength(owner); + hdfsFileInfo->mOwner = new (getHeap()) char[tempLen+1]; + strncpy(hdfsFileInfo->mOwner, jenv->GetStringUTFChars(owner, NULL), tempLen); + hdfsFileInfo->mOwner[tempLen] = '\0'; + tempLen = jenv->GetStringUTFLength(group); + hdfsFileInfo->mGroup = new (getHeap()) char[tempLen+1]; + strncpy(hdfsFileInfo->mGroup, jenv->GetStringUTFChars(group, NULL), tempLen); + hdfsFileInfo->mGroup[tempLen] = '\0'; + return HDFS_CLIENT_OK; +} + + +#ifdef __cplusplus +extern "C" { +#endif + +jint JNICALL Java_org_trafodion_sql_HDFSClient_sendFileStatus + (JNIEnv *jenv, jobject j_obj, jlong hdfsClientJniObj, jint numFiles, jint fileNo, jboolean isDir, + jstring filename, jlong modTime, jlong len, jshort numReplicas, jlong blockSize, + jstring owner, jstring group, jshort permissions, jlong accessTime) +{ + HDFS_Client_RetCode retcode; + HdfsClient *hdfsClient = (HdfsClient *)hdfsClientJniObj; + retcode = hdfsClient->setHdfsFileInfo(jenv, numFiles, fileNo, isDir, filename, modTime, len, numReplicas, blockSize, owner, + group, permissions, accessTime); + return (jint) retcode; +} + +#ifdef __cplusplus +} +#endif http://git-wip-us.apache.org/repos/asf/trafodion/blob/a99ee11e/core/sql/executor/HdfsClient_JNI.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/HdfsClient_JNI.h b/core/sql/executor/HdfsClient_JNI.h index 5854d59..c45d226 100644 --- a/core/sql/executor/HdfsClient_JNI.h +++ b/core/sql/executor/HdfsClient_JNI.h @@ -87,10 +87,31 @@ private: // =========================================================================== typedef enum { + HDFS_FILE_KIND + , HDFS_DIRECTORY_KIND +} HDFS_FileType; + +typedef struct { + HDFS_FileType mKind; /* file or directory */ + char *mName; /* the name of the file */ + Int64 mLastMod; /* the last modification time for the file in seconds */ + Int64 mSize; /* the size of the file in bytes */ + short mReplication; /* the count of replicas */ + Int64 mBlockSize; /* the block size for the file */ + char *mOwner; /* the owner of the file */ + char *mGroup; /* the group associated with the file */ + short mPermissions; /* the permissions associated with the file */ + Int64 mLastAccess; /* the last access time for the file in seconds */ +} HDFS_FileInfo; + + +typedef enum { HDFS_CLIENT_OK = JOI_OK ,HDFS_CLIENT_FIRST = HDFS_SCAN_LAST ,HDFS_CLIENT_ERROR_HDFS_CREATE_PARAM = HDFS_CLIENT_FIRST ,HDFS_CLIENT_ERROR_HDFS_CREATE_EXCEPTION + ,HDFS_CLIENT_ERROR_HDFS_OPEN_PARAM + ,HDFS_CLIENT_ERROR_HDFS_OPEN_EXCEPTION ,HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM ,HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION ,HDFS_CLIENT_ERROR_HDFS_CLOSE_EXCEPTION @@ -103,6 +124,9 @@ typedef enum { ,HDFS_CLIENT_ERROR_HDFS_EXISTS_FILE_EXISTS ,HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_PARAM ,HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_EXCEPTION + ,HDFS_CLIENT_ERROR_SET_HDFSFILEINFO + ,HDFS_CLIENT_ERROR_HDFS_LIST_DIR_PARAM + ,HDFS_CLIENT_ERROR_HDFS_LIST_DIR_EXCEPTION ,HDFS_CLIENT_LAST } HDFS_Client_RetCode; @@ -112,8 +136,12 @@ public: // Default constructor - for creating a new JVM HdfsClient(NAHeap *heap) : JavaObjectInterface(heap) - {} - + , hdfsFileInfo_(NULL) + , numFiles_(0) + { + } + + ~HdfsClient(); static HdfsClient *newInstance(NAHeap *heap, HDFS_Client_RetCode &retCode); // Get the error description. @@ -123,6 +151,7 @@ public: // Must be called. HDFS_Client_RetCode init(); HDFS_Client_RetCode hdfsCreate(const char* path, NABoolean compress); + HDFS_Client_RetCode hdfsOpen(const char* path, NABoolean compress); HDFS_Client_RetCode hdfsWrite(const char* data, Int64 size); HDFS_Client_RetCode hdfsClose(); HDFS_Client_RetCode hdfsMergeFiles(const NAString& srcPath, @@ -130,20 +159,28 @@ public: HDFS_Client_RetCode hdfsCleanUnloadPath(const NAString& uldPath ); HDFS_Client_RetCode hdfsExists(const NAString& uldPath, NABoolean & exists ); HDFS_Client_RetCode hdfsDeletePath(const NAString& delPath); + HDFS_Client_RetCode setHdfsFileInfo(JNIEnv *jenv, jint numFiles, jint fileNo, jboolean isDir, + jstring filename, jlong modTime, jlong len, jshort numReplicas, jlong blockSize, + jstring owner, jstring group, jshort permissions, jlong accessTime); + HDFS_Client_RetCode hdfsListDirectory(const char *pathStr, HDFS_FileInfo **hdfsFileInfo, int *numFiles); + void deleteHdfsFileInfo(); private: enum JAVA_METHODS { JM_CTOR = 0, JM_HDFS_CREATE, + JM_HDFS_OPEN, JM_HDFS_WRITE, JM_HDFS_CLOSE, JM_HDFS_MERGE_FILES, JM_HDFS_CLEAN_UNLOAD_PATH, JM_HDFS_EXISTS, JM_HDFS_DELETE_PATH, + JM_HDFS_LIST_DIRECTORY, JM_LAST }; - + HDFS_FileInfo *hdfsFileInfo_; + int numFiles_; static jclass javaClass_; static JavaMethodInit* JavaMethods_; static bool javaMethodsInitialized_; @@ -151,4 +188,5 @@ private: static pthread_mutex_t javaMethodsInitMutex_; }; + #endif http://git-wip-us.apache.org/repos/asf/trafodion/blob/a99ee11e/core/sql/executor/JavaObjectInterface.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/JavaObjectInterface.cpp b/core/sql/executor/JavaObjectInterface.cpp index ecd8c1e..6919866 100644 --- a/core/sql/executor/JavaObjectInterface.cpp +++ b/core/sql/executor/JavaObjectInterface.cpp @@ -476,8 +476,14 @@ JOI_RetCode JavaObjectInterface::init(char *className, if (JavaMethods[i].methodID == 0 || jenv_->ExceptionCheck()) { getExceptionDetails(); - QRLogger::log(CAT_SQL_HDFS_JNI_TOP, LL_ERROR, "Error in GetMethod(%s).", JavaMethods[i].jm_name); - return JOI_ERROR_GETMETHOD; + JavaMethods[i].methodID = jenv_->GetStaticMethodID(javaClass, + JavaMethods[i].jm_name, + JavaMethods[i].jm_signature); + if (JavaMethods[i].methodID == 0 || jenv_->ExceptionCheck()) { + getExceptionDetails(); + QRLogger::log(CAT_SQL_HDFS_JNI_TOP, LL_ERROR, "Error in GetMethod(%s).", JavaMethods[i].jm_name); + return JOI_ERROR_GETMETHOD; + } } } } http://git-wip-us.apache.org/repos/asf/trafodion/blob/a99ee11e/core/sql/sqlcomp/CmpSeabaseDDLcommon.cpp ---------------------------------------------------------------------- diff --git a/core/sql/sqlcomp/CmpSeabaseDDLcommon.cpp b/core/sql/sqlcomp/CmpSeabaseDDLcommon.cpp index 9bad515..e767cec 100644 --- a/core/sql/sqlcomp/CmpSeabaseDDLcommon.cpp +++ b/core/sql/sqlcomp/CmpSeabaseDDLcommon.cpp @@ -68,7 +68,6 @@ #include "ComMisc.h" #include "CmpSeabaseDDLmd.h" #include "CmpSeabaseDDLroutine.h" -#include "hdfs.h" #include "StmtDDLAlterLibrary.h" #include "logmxevent_traf.h" #include "exp_clause_derived.h" http://git-wip-us.apache.org/repos/asf/trafodion/blob/a99ee11e/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java index 5c8c487..fe116d7 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java +++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.conf.Configuration; import java.nio.ByteBuffer; import java.io.IOException; @@ -84,6 +85,7 @@ public class HDFSClient catch (IOException ioe) { throw new RuntimeException("Exception in HDFSClient static block", ioe); } + System.loadLibrary("executor"); } class HDFSRead implements Callable @@ -202,6 +204,35 @@ public class HDFSClient logger_.debug("HDFSClient.hdfsCreate() - compressed output stream created" ); return true; } + + boolean hdfsOpen(String fname , boolean compress) throws IOException + { + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsOpen() - started" ); + Path filePath = null; + if (!compress || (compress && fname.endsWith(".gz"))) + filePath = new Path(fname); + else + filePath = new Path(fname + ".gz"); + + FileSystem fs = FileSystem.get(filePath.toUri(),config_); + FSDataOutputStream fsOut; + if (fs.exists(filePath)) + fsOut = fs.append(filePath); + else + fsOut = fs.create(filePath); + + if (compress) { + GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, config_); + Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec); + outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor); + } + else + outStream_ = fsOut; + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsCreate() - compressed output stream created" ); + return true; + } boolean hdfsWrite(byte[] buff, long len) throws IOException { @@ -225,7 +256,7 @@ public class HDFSClient } - public boolean hdfsMergeFiles(String srcPathStr, String dstPathStr) throws IOException + public static boolean hdfsMergeFiles(String srcPathStr, String dstPathStr) throws IOException { if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - start"); if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - source Path: " + srcPathStr + @@ -265,7 +296,7 @@ public class HDFSClient return true; } - public boolean hdfsCleanUnloadPath(String uldPathStr + public static boolean hdfsCleanUnloadPath(String uldPathStr /*, boolean checkExistence, String mergeFileStr*/) throws IOException { if (logger_.isDebugEnabled()) @@ -289,7 +320,7 @@ public class HDFSClient return true; } - public boolean hdfsExists(String filePathStr) throws IOException + public static boolean hdfsExists(String filePathStr) throws IOException { if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsExists() - Path: " + filePathStr); @@ -301,7 +332,7 @@ public class HDFSClient return false; } - public boolean hdfsDeletePath(String pathStr) throws IOException + public static boolean hdfsDeletePath(String pathStr) throws IOException { if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsDeletePath() - start - Path: " + pathStr); @@ -310,10 +341,54 @@ public class HDFSClient fs.delete(delPath, true); return true; } + + public int hdfsListDirectory(String pathStr, long hdfsClientJniObj) throws IOException + { + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsListDirectory() - start - Path: " + pathStr); + Path listPath = new Path(pathStr ); + FileSystem fs = FileSystem.get(listPath.toUri(), config_); + FileStatus[] fileStatus; + if (fs.isDirectory(listPath)) + fileStatus = fs.listStatus(listPath); + else + throw new IOException("The path " + listPath + "is not a directory"); + FileStatus aFileStatus; + int retcode; + if (fileStatus != null) { + for (int i = 0; i < fileStatus.length; i++) + { + aFileStatus = fileStatus[i]; + retcode = sendFileStatus(hdfsClientJniObj, fileStatus.length, + i, + aFileStatus.isDirectory(), + aFileStatus.getPath().toString(), + aFileStatus.getModificationTime(), + aFileStatus.getLen(), + aFileStatus.getReplication(), + aFileStatus.getBlockSize(), + aFileStatus.getOwner(), + aFileStatus.getGroup(), + aFileStatus.getPermission().toShort(), + aFileStatus.getAccessTime()); + if (retcode != 0) + throw new IOException("Error " + retcode + " while sending the file status info for file " + aFileStatus.getPath().toString()); + } + return fileStatus.length; + } + else + return 0; + } public static void shutdown() throws InterruptedException { executorService_.awaitTermination(100, TimeUnit.MILLISECONDS); executorService_.shutdown(); } + + private native int sendFileStatus(long jniObj, int numFiles, int fileNo, boolean isDir, + String filename, long modTime, long len, + short numReplicas, long blockSize, String owner, String group, + short permissions, long accessTime); + }
