Repository: trafodion Updated Branches: refs/heads/master d48a88741 -> e618aaf3d
[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text formatted hive tables Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/ac706607 Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/ac706607 Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/ac706607 Branch: refs/heads/master Commit: ac7066074611a09da33adf88673c2f023e7dda7d Parents: 75c7b39 Author: selvaganesang <[email protected]> Authored: Wed Feb 28 19:27:40 2018 +0000 Committer: selvaganesang <[email protected]> Committed: Wed Feb 28 21:57:59 2018 +0000 ---------------------------------------------------------------------- core/sql/cli/Context.h | 1 + core/sql/cli/Globals.cpp | 1 + core/sql/cli/Globals.h | 1 + core/sql/comexe/ComTdbHdfsScan.h | 5 +- core/sql/common/ComRtUtils.cpp | 1 + core/sql/executor/ExExeUtilGet.cpp | 2 - core/sql/executor/ExFastTransport.cpp | 7 +-- core/sql/executor/ExHdfsScan.cpp | 58 ++++++++++++++++++-- core/sql/executor/ExHdfsScan.h | 23 ++++++++ core/sql/executor/HBaseClient_JNI.cpp | 1 - core/sql/executor/HdfsClient_JNI.cpp | 28 +++++----- core/sql/exp/ExpLOBaccess.cpp | 1 + core/sql/optimizer/HDFSHook.cpp | 1 + core/sql/optimizer/NATable.cpp | 1 + core/sql/sqlcomp/DefaultConstants.h | 1 + .../main/java/org/trafodion/sql/HDFSClient.java | 13 +++-- 16 files changed, 112 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/cli/Context.h ---------------------------------------------------------------------- diff --git a/core/sql/cli/Context.h b/core/sql/cli/Context.h index 973a647..bcfd06b 100644 --- a/core/sql/cli/Context.h +++ b/core/sql/cli/Context.h @@ -512,6 +512,7 @@ private: NAString jniErrorStr_; HBaseClient_JNI *hbaseClientJNI_; HiveClient_JNI *hiveClientJNI_; + HdfsClient *hdfsClientJNI_; // this points to data used by trafSE (traf storage engine) that is context specific. // It points to a list of 'black box' of data allocated by user and is returned http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/cli/Globals.cpp ---------------------------------------------------------------------- diff --git a/core/sql/cli/Globals.cpp b/core/sql/cli/Globals.cpp index ed30502..fb09008 100644 --- a/core/sql/cli/Globals.cpp +++ b/core/sql/cli/Globals.cpp @@ -60,6 +60,7 @@ #include <semaphore.h> #include <pthread.h> #include "HBaseClient_JNI.h" +#include "HdfsClient_JNI.h" #include "LmLangManagerC.h" #include "LmLangManagerJava.h" #include "CliSemaphore.h" http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/cli/Globals.h ---------------------------------------------------------------------- diff --git a/core/sql/cli/Globals.h b/core/sql/cli/Globals.h index 284d992..b8e04ea 100644 --- a/core/sql/cli/Globals.h +++ b/core/sql/cli/Globals.h @@ -88,6 +88,7 @@ class CliGlobals; class CLISemaphore; class HBaseClient_JNI; class HiveClient_JNI; +class HdfsClient; class TransMode; class ContextTidMap; class LmLanguageManager; http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/comexe/ComTdbHdfsScan.h ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComTdbHdfsScan.h b/core/sql/comexe/ComTdbHdfsScan.h index 86534be..46d7f2f 100755 --- a/core/sql/comexe/ComTdbHdfsScan.h +++ b/core/sql/comexe/ComTdbHdfsScan.h @@ -54,7 +54,8 @@ class ComTdbHdfsScan : public ComTdb CONTINUE_ON_ERROR = 0x0020, LOG_ERROR_ROWS = 0x0040, ASSIGN_RANGES_AT_RUNTIME = 0x0080, - USE_LIBHDFS_SCAN = 0x0100 + TREAT_EMPTY_AS_NULL = 0x0100, + USE_LIBHDFS_SCAN = 0x0200 }; // Expression to filter rows. @@ -288,7 +289,7 @@ public: {(v ? flags_ |= USE_LIBHDFS_SCAN : flags_ &= ~USE_LIBHDFS_SCAN); } NABoolean getUseLibhdfsScan() const { return (flags_ & USE_LIBHDFS_SCAN) != 0; } - + UInt32 getMaxErrorRows() const { return maxErrorRows_;} void setMaxErrorRows(UInt32 v ) { maxErrorRows_= v; } http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/common/ComRtUtils.cpp ---------------------------------------------------------------------- diff --git a/core/sql/common/ComRtUtils.cpp b/core/sql/common/ComRtUtils.cpp index f2619f9..35f9ca7 100644 --- a/core/sql/common/ComRtUtils.cpp +++ b/core/sql/common/ComRtUtils.cpp @@ -81,6 +81,7 @@ #include "seabed/ms.h" #include "seabed/fs.h" +#include "HdfsClient_JNI.h" struct ModName { public: const char * name; http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/executor/ExExeUtilGet.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExExeUtilGet.cpp b/core/sql/executor/ExExeUtilGet.cpp index 539a8cf..cd20fae 100644 --- a/core/sql/executor/ExExeUtilGet.cpp +++ b/core/sql/executor/ExExeUtilGet.cpp @@ -6103,7 +6103,6 @@ ExExeUtilRegionStatsTcb::ExExeUtilRegionStatsTcb( ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(), (char*)"", //exe_util_tdb.server(), (char*)""); //exe_util_tdb.zkPort(), - regionInfoList_ = NULL; tableName_ = new(glob->getDefaultHeap()) char[2000]; @@ -6876,7 +6875,6 @@ ExExeUtilClusterStatsTcb::ExExeUtilClusterStatsTcb( ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(), (char*)"", //exe_util_tdb.server(), (char*)""); //exe_util_tdb.zkPort()); - regionInfoList_ = NULL; // get hbase rootdir location. Max linux pathlength is 1024. http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/executor/ExFastTransport.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExFastTransport.cpp b/core/sql/executor/ExFastTransport.cpp index 9d43545..ba4cfbb 100644 --- a/core/sql/executor/ExFastTransport.cpp +++ b/core/sql/executor/ExFastTransport.cpp @@ -827,12 +827,11 @@ ExWorkProcRetcode ExHdfsFastExtractTcb::work() } } else - { + { updateWorkATPDiagsArea(__FILE__,__LINE__,"sockets are not supported"); pstate.step_ = EXTRACT_ERROR; break; - } - + } for (UInt32 i = 0; i < myTdb().getChildTuple()->numAttrs(); i++) { Attributes * attr = myTdb().getChildTableAttr(i); @@ -1290,7 +1289,7 @@ void ExHdfsFastExtractTcb::createHdfsClientFileError(Int32 hdfsClientRetCode) (ExeErrorCode)(8447), NULL, NULL, NULL, NULL, errorMsg, - (char *)currContext->getJniErrorStr().data()); + (char *)currContext->getJniErrorStr().data()); updateWorkATPDiagsArea(diagsArea); } http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/executor/ExHdfsScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp index 2b73feb..3ff153e 100644 --- a/core/sql/executor/ExHdfsScan.cpp +++ b/core/sql/executor/ExHdfsScan.cpp @@ -317,7 +317,6 @@ void ExHdfsScanTcb::freeResources() if (hdfsScan_ != NULL) NADELETE(hdfsScan_, HdfsScan, getHeap()); } - NABoolean ExHdfsScanTcb::needStatsEntry() { // stats are collected for ALL and OPERATOR options. @@ -1099,15 +1098,13 @@ ExWorkProcRetcode ExHdfsScanTcb::work() if ((BYTE *)startOfNextRow > bufLogicalEnd_) { step_ = TRAF_HDFS_READ; hdfsBufNextRow_ = NULL; - if (!exception_) - break; } else hdfsBufNextRow_ = startOfNextRow; - } + } } - + if (exception_) { nextStep_ = step_; @@ -2019,6 +2016,54 @@ void ExHdfsScanTcb::deallocateRuntimeRanges() } } +void ExHdfsScanTcb::handleException(NAHeap *heap, + char *logErrorRow, + Lng32 logErrorRowLen, + ComCondition *errorCond) +{ + Lng32 errorMsgLen = 0; + charBuf *cBuf = NULL; + char *errorMsg; + HDFS_Client_RetCode hdfsClientRetcode; + + if (loggingErrorDiags_ != NULL) + return; + + if (!loggingFileCreated_) { + hdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), hdfsClientRetcode); + if (hdfsClientRetcode == HDFS_CLIENT_OK) + hdfsClientRetcode = hdfsClient_->hdfsCreate(loggingFileName_, FALSE); + if (hdfsClientRetcode == HDFS_CLIENT_OK) + loggingFileCreated_ = TRUE; + else + goto logErrorReturn; + } + hdfsClientRetcode = hdfsClient_->hdfsWrite(logErrorRow, logErrorRowLen); + if (hdfsClientRetcode != HDFS_CLIENT_OK) + goto logErrorReturn; + if (errorCond != NULL) { + errorMsgLen = errorCond->getMessageLength(); + const NAWcharBuf wBuf((NAWchar*)errorCond->getMessageText(), errorMsgLen, heap); + cBuf = unicodeToISO88591(wBuf, heap, cBuf); + errorMsg = (char *)cBuf->data(); + errorMsgLen = cBuf -> getStrLen(); + errorMsg[errorMsgLen]='\n'; + errorMsgLen++; + } + else { + errorMsg = (char *)"[UNKNOWN EXCEPTION]\n"; + errorMsgLen = strlen(errorMsg); + } + hdfsClientRetcode = hdfsClient_->hdfsWrite(errorMsg, errorMsgLen); +logErrorReturn: + if (hdfsClientRetcode != HDFS_CLIENT_OK) { + loggingErrorDiags_ = ComDiagsArea::allocate(heap); + *loggingErrorDiags_ << DgSqlCode(EXE_ERROR_WHILE_LOGGING) + << DgString0(loggingFileName_) + << DgString1((char *)GetCliGlobals()->currContext()->getJniErrorStr().data()); + } +} + short ExHdfsScanTcb::moveRowToUpQueue(const char * row, Lng32 len, short * rc, NABoolean isVarchar) { @@ -2119,7 +2164,7 @@ short ExHdfsScanTcb::handleDone(ExWorkProcRetcode &rc) return 0; } - +/* void ExHdfsScanTcb::handleException(NAHeap *heap, char *logErrorRow, Lng32 logErrorRowLen, @@ -2167,6 +2212,7 @@ logErrorReturn: << DgString1((char *)GetCliGlobals()->currContext()->getJniErrorStr().data()); } } +*/ //////////////////////////////////////////////////////////////////////// // ORC files http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/executor/ExHdfsScan.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHdfsScan.h b/core/sql/executor/ExHdfsScan.h index f4ad7e1..04b632e 100644 --- a/core/sql/executor/ExHdfsScan.h +++ b/core/sql/executor/ExHdfsScan.h @@ -135,6 +135,29 @@ class ExHdfsScanTcb : public ex_tcb public: enum +/* + USE_LIBHDFS_SCAN - OFF enables hdfs access via java classes + org.trafodion.sql.HdfsScan and org.trafodion.sql.HdfsClient + Steps involved: + 1. Create a new HdfsScan object and set the scan ranges of the fragment instance in it + The scan range involves the following and it is determined either at runtime or compile time + a) filename + b) offset + c) len + Java layer always reads more than the len by rangeTailIOSize_ to accommdate the record split + 2. Two ByteBuffer objects are also passsed to HdfsScan object. These ByteBuffers are backed up by + 2 native buffers where the data is fetched. The buffer has a head room of size rangeTailIOSize_ and the + data is always read after the head room. + 3. HdfsScan returns an int array containing bytesRead, bufNo, rangeNo, isEOF and schedules either + the remaining bytes to be read or the next range using ByteBuffers alternatively. + 4. HdfsScan returns null array when there is no more data to be read. + 5. When the data is processed in one ByteBuffer in the native thread, the data is fetched into the other ByteBuffer by + another Java thread. + 6. Native layer after processing all the rows in one ByteBuffer, moves the last incomplete row to head room of the + other ByteBuffer. Then it requests to check if the read is complete. The native layer processes the buffer starting + from the copied incomplete row. +*/ + { BYTES_COMPLETED, BUF_NO, http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/executor/HBaseClient_JNI.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/HBaseClient_JNI.cpp b/core/sql/executor/HBaseClient_JNI.cpp index 6b400cd..8d12821 100644 --- a/core/sql/executor/HBaseClient_JNI.cpp +++ b/core/sql/executor/HBaseClient_JNI.cpp @@ -5166,4 +5166,3 @@ void deleteNAArray(CollHeap *heap, NAArray<HbaseStr> *array) } NADELETE(array, NAArray, heap); } - http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/executor/HdfsClient_JNI.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/HdfsClient_JNI.cpp b/core/sql/executor/HdfsClient_JNI.cpp index f08aa92..5f0f810 100644 --- a/core/sql/executor/HdfsClient_JNI.cpp +++ b/core/sql/executor/HdfsClient_JNI.cpp @@ -23,6 +23,7 @@ #include "QRLogger.h" #include "Globals.h" +#include "Context.h" #include "jni.h" #include "HdfsClient_JNI.h" @@ -239,6 +240,7 @@ HDFS_Scan_RetCode HdfsScan::trafHdfsRead(NAHeap *heap, ExHdfsScanStats *hdfsStat } if (j_retArray == NULL) return HDFS_SCAN_EOR; + short retArrayLen = jenv_->GetArrayLength(j_retArray); ex_assert(retArrayLen == arrayLen, "HdfsScan::trafHdfsRead() InternalError: retArrayLen != arrayLen"); jenv_->GetIntArrayRegion(j_retArray, 0, 4, retArray); @@ -285,23 +287,11 @@ HdfsClient::~HdfsClient() deleteHdfsFileInfo(); } -void HdfsClient::deleteHdfsFileInfo() -{ - for (int i = 0; i < numFiles_ ; i ++) { - NADELETEBASIC(hdfsFileInfo_[i].mName, getHeap()); - NADELETEBASIC(hdfsFileInfo_[i].mOwner, getHeap()); - NADELETEBASIC(hdfsFileInfo_[i].mGroup, getHeap()); - } - NADELETEBASIC(hdfsFileInfo_, getHeap()); - numFiles_ = 0; - hdfsFileInfo_ = NULL; -} - HdfsClient *HdfsClient::newInstance(NAHeap *heap, HDFS_Client_RetCode &retCode) { QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::newInstance() called."); - if (initJNIEnv() != JOI_OK) + if (initJNIEnv() != JOI_OK) return NULL; retCode = HDFS_CLIENT_OK; HdfsClient *hdfsClient = new (heap) HdfsClient(heap); @@ -315,6 +305,18 @@ HdfsClient *HdfsClient::newInstance(NAHeap *heap, HDFS_Client_RetCode &retCode) return hdfsClient; } +void HdfsClient::deleteHdfsFileInfo() +{ + for (int i = 0; i < numFiles_ ; i ++) { + NADELETEBASIC(hdfsFileInfo_[i].mName, getHeap()); + NADELETEBASIC(hdfsFileInfo_[i].mOwner, getHeap()); + NADELETEBASIC(hdfsFileInfo_[i].mGroup, getHeap()); + } + NADELETEBASIC(hdfsFileInfo_, getHeap()); + numFiles_ = 0; + hdfsFileInfo_ = NULL; +} + HDFS_Client_RetCode HdfsClient::init() { static char className[]="org/trafodion/sql/HDFSClient"; http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/exp/ExpLOBaccess.cpp ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOBaccess.cpp b/core/sql/exp/ExpLOBaccess.cpp index b5a427b..481e960 100644 --- a/core/sql/exp/ExpLOBaccess.cpp +++ b/core/sql/exp/ExpLOBaccess.cpp @@ -63,6 +63,7 @@ #include "ComQueue.h" #include "QRLogger.h" #include "NAMemory.h" +#include "HdfsClient_JNI.h" #include <seabed/ms.h> #include <seabed/fserr.h> #include <curl/curl.h> http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/optimizer/HDFSHook.cpp ---------------------------------------------------------------------- diff --git a/core/sql/optimizer/HDFSHook.cpp b/core/sql/optimizer/HDFSHook.cpp index dfc80ad..c8793d9 100644 --- a/core/sql/optimizer/HDFSHook.cpp +++ b/core/sql/optimizer/HDFSHook.cpp @@ -29,6 +29,7 @@ // for DNS name resolution #include <netdb.h> +#include "HdfsClient_JNI.h" #include "Globals.h" #include "Context.h" // Initialize static variables http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/optimizer/NATable.cpp ---------------------------------------------------------------------- diff --git a/core/sql/optimizer/NATable.cpp b/core/sql/optimizer/NATable.cpp index d657a48..cb0cc39 100644 --- a/core/sql/optimizer/NATable.cpp +++ b/core/sql/optimizer/NATable.cpp @@ -87,6 +87,7 @@ #define MAX_NODE_NAME 9 #include "SqlParserGlobals.h" +#include "HdfsClient_JNI.h" //#define __ROSETTA //#include "rosetta_ddl_include.h" http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/sqlcomp/DefaultConstants.h ---------------------------------------------------------------------- diff --git a/core/sql/sqlcomp/DefaultConstants.h b/core/sql/sqlcomp/DefaultConstants.h index 2671482..fd110de 100644 --- a/core/sql/sqlcomp/DefaultConstants.h +++ b/core/sql/sqlcomp/DefaultConstants.h @@ -3310,6 +3310,7 @@ enum DefaultConstants // Use the earlier implementation of HdfsScan via libhdfs USE_LIBHDFS_SCAN, + // This enum constant must be the LAST one in the list; it's a count, // not an Attribute (it's not IN DefaultDefaults; it's the SIZE of it)! __NUM_DEFAULT_ATTRIBUTES http://git-wip-us.apache.org/repos/asf/trafodion/blob/ac706607/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java index fe116d7..52453cc 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java +++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java @@ -21,19 +21,20 @@ package org.trafodion.sql; +import java.io.IOException; +import java.io.EOFException; +import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.log4j.PropertyConfigurator; import org.apache.log4j.Logger; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.conf.Configuration; -import java.nio.ByteBuffer; -import java.io.IOException; import java.io.EOFException; -import java.io.OutputStream; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.Executors; @@ -168,7 +169,7 @@ public class HDFSClient bytesRead = retObject.intValue(); fsdis_.close(); return bytesRead; - } + } public int getRangeNo() { @@ -392,3 +393,5 @@ public class HDFSClient short permissions, long accessTime); } + +
