Repository: incubator-trafodion Updated Branches: refs/heads/master d19936293 -> c39d3abf6
hive data modification detection: commit #1 Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/f4728220 Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/f4728220 Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/f4728220 Branch: refs/heads/master Commit: f4728220c7a21f06c5c295e7a4c515a09a1a219d Parents: c41b39b Author: Anoop Sharma <[email protected]> Authored: Sun May 22 04:26:45 2016 +0000 Committer: Anoop Sharma <[email protected]> Committed: Sun May 22 04:26:45 2016 +0000 ---------------------------------------------------------------------- core/sql/bin/SqlciErrors.txt | 2 + core/sql/cli/SessionDefaults.cpp | 3 + core/sql/comexe/ComTdbHdfsScan.cpp | 33 +++++++++-- core/sql/comexe/ComTdbHdfsScan.h | 26 +++++++-- core/sql/executor/ExHdfsScan.cpp | 76 +++++++++++++++++++++++-- core/sql/executor/ExHdfsScan.h | 13 +++-- core/sql/exp/ExpLOBaccess.cpp | 90 +++++++++++++++++++++++++++--- core/sql/exp/ExpLOBaccess.h | 79 +++++++++++++++----------- core/sql/exp/ExpLOBenums.h | 3 + core/sql/exp/ExpLOBinterface.cpp | 46 ++++++++++++++- core/sql/exp/ExpLOBinterface.h | 13 +++++ core/sql/generator/GenRelScan.cpp | 47 +++++++++------- core/sql/optimizer/HDFSHook.cpp | 5 ++ core/sql/optimizer/HDFSHook.h | 4 ++ core/sql/regress/executor/EXPECTED020 | 4 ++ core/sql/sqlcomp/DefaultConstants.h | 4 ++ core/sql/sqlcomp/nadefaults.cpp | 2 + 17 files changed, 369 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f4728220/core/sql/bin/SqlciErrors.txt ---------------------------------------------------------------------- diff --git a/core/sql/bin/SqlciErrors.txt b/core/sql/bin/SqlciErrors.txt index cf532fe..4aa8cf1 100644 --- a/core/sql/bin/SqlciErrors.txt +++ b/core/sql/bin/SqlciErrors.txt @@ -1550,6 +1550,8 @@ $1~String1 -------------------------------- 8432 22003 99999 BEGINNER MINOR LOGONLY A negative value cannot be converted to an unsigned numeric datatype.$0~string0 8433 22003 99999 BEGINNER MINOR LOGONLY Invalid $0~string0 character encountered in $1~string1. 8434 ZZZZZ 99999 BEGINNER MAJOR DBADMIN Invalid target column for LOB function. The column needs to be blob/clob type. +8435 ZZZZZ 99999 UUUUUUUU UUUUU UUUUUUU --- unused --- +8436 ZZZZZ 99999 BEGINNER MAJOR DBADMIN Mismatch detected between compiletime and runtime hive table definitions. 8440 ZZZZZ 99999 BEGINNER MAJOR DBADMIN The size of the history buffer is too small to execute one or more of the OLAP Windowed Functions in the query. 8441 ZZZZZ 99999 BEGINNER MAJOR DBADMIN one or more of the OLAP Windowed Functions in the query may require overflow which is not supported yet. 8442 ZZZZZ 99999 BEGINNER MAJOR DBADMIN Unable to access $0~string0 interface. Call to $1~string1 returned error $2~string2($0~int0). Error detail $1~int1. http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f4728220/core/sql/cli/SessionDefaults.cpp ---------------------------------------------------------------------- diff --git a/core/sql/cli/SessionDefaults.cpp b/core/sql/cli/SessionDefaults.cpp index 1ce2145..372943f 100644 --- a/core/sql/cli/SessionDefaults.cpp +++ b/core/sql/cli/SessionDefaults.cpp @@ -740,6 +740,7 @@ static const QueryString cqdInfo[] = {"unique_hash_joins"}, {"OFF"} , {"transform_to_sidetree_insert"}, {"OFF"} , {"METADATA_CACHE_SIZE"}, {"0"} +, {"QUERY_CACHE"}, {"0"} }; static const AQRInfo::AQRErrorMap aqrErrorMap[] = @@ -771,6 +772,8 @@ static const AQRInfo::AQRErrorMap aqrErrorMap[] = // parallel purgedata failed AQREntry( 8022, 0, 3, 60, 0, 0, "", 0, 1), + AQREntry( 8436, 0, 1, 0, 0, 1, "04", 0, 0), + // FS memory errors AQREntry( 8550, 30, 1, 60, 0, 0, "", 0, 0), AQREntry( 8550, 31, 1, 60, 0, 0, "", 0, 0), http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f4728220/core/sql/comexe/ComTdbHdfsScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComTdbHdfsScan.cpp b/core/sql/comexe/ComTdbHdfsScan.cpp index 0f42446..a6aac21 100755 --- a/core/sql/comexe/ComTdbHdfsScan.cpp +++ b/core/sql/comexe/ComTdbHdfsScan.cpp @@ -65,9 +65,14 @@ ComTdbHdfsScan::ComTdbHdfsScan( Cardinality estimatedRowCount, Int32 numBuffers, UInt32 bufferSize, - char * errCountTable = NULL, - char * loggingLocation = NULL, - char * errCountId = NULL + char * errCountTable, + char * loggingLocation, + char * errCountId, + + char * hdfsFilesDir, + Int64 modTSforDir, + Lng32 numFilesInDir + ) : ComTdb( ComTdb::ex_HDFS_SCAN, eye_HDFS_SCAN, @@ -107,7 +112,10 @@ ComTdbHdfsScan::ComTdbHdfsScan( flags_(0), errCountTable_(errCountTable), loggingLocation_(loggingLocation), - errCountRowId_(errCountId) + errCountRowId_(errCountId), + hdfsFilesDir_(hdfsFilesDir), + modTSforDir_(modTSforDir), + numFilesInDir_(numFilesInDir) {}; ComTdbHdfsScan::~ComTdbHdfsScan() @@ -142,6 +150,9 @@ Long ComTdbHdfsScan::pack(void * space) errCountTable_.pack(space); loggingLocation_.pack(space); errCountRowId_.pack(space); + + hdfsFilesDir_.pack(space); + return ComTdb::pack(space); } @@ -173,6 +184,9 @@ Lng32 ComTdbHdfsScan::unpack(void * base, void * reallocator) if (errCountTable_.unpack(base)) return -1; if (loggingLocation_.unpack(base)) return -1; if (errCountRowId_.unpack(base)) return -1; + + if (hdfsFilesDir_.unpack(base)) return -1; + return ComTdb::unpack(base, reallocator); } @@ -419,6 +433,17 @@ void ComTdbHdfsScan::displayContents(Space * space,ULng32 flag) sizeof(short)); } } + + if (hdfsFilesDir_) + { + str_sprintf(buf, "hdfsDir: %s", hdfsFilesDir_); + space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short)); + + str_sprintf(buf, "modTSforDir_ = %Ld, numFilesInDir_ = %d", + modTSforDir_, numFilesInDir_); + space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short)); + } + } if(flag & 0x00000001) http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f4728220/core/sql/comexe/ComTdbHdfsScan.h ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComTdbHdfsScan.h b/core/sql/comexe/ComTdbHdfsScan.h index 0842c19..0b17947 100755 --- a/core/sql/comexe/ComTdbHdfsScan.h +++ b/core/sql/comexe/ComTdbHdfsScan.h @@ -131,8 +131,17 @@ class ComTdbHdfsScan : public ComTdb NABasicPtr loggingLocation_; // 168 - 175 NABasicPtr errCountRowId_; // 176 - 183 UInt32 hiveScanMode_; // 184 - 187 - char fillersComTdbHdfsScan1_[12]; // 188 - 199 + char fillersComTdbHdfsScan1_[4]; // 188 - 191 + + // next 3 params used to check if data under hdfsFileDir + // was modified after query was compiled. + NABasicPtr hdfsFilesDir_; // 192 - 199 + Int64 modTSforDir_; // 200 - 207 + Lng32 numFilesInDir_; // 208 - 211 + + char fillersComTdbHdfsScan2_[12]; // 212 - 223 + public: enum HDFSFileType { @@ -178,10 +187,17 @@ public: queue_index up, Cardinality estimatedRowCount, Int32 numBuffers, - UInt32 bufferSize - , char * errCountTable - , char * loggingLocation - , char * errCountId + UInt32 bufferSize, + + char * errCountTable = NULL, + char * loggingLocation = NULL, + char * errCountId = NULL, + + // next 3 params used to check if data under hdfsFileDir + // was modified after query was compiled. + char * hdfsFilesDir = NULL, + Int64 modTSforDir = -1, + Lng32 numFilesInDir = -1 ); ~ComTdbHdfsScan(); http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f4728220/core/sql/executor/ExHdfsScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp index 1278c3a..dbb5e7c 100644 --- a/core/sql/executor/ExHdfsScan.cpp +++ b/core/sql/executor/ExHdfsScan.cpp @@ -115,6 +115,7 @@ ExHdfsScanTcb::ExHdfsScanTcb( , numBytesProcessedInRange_(0) , exception_(FALSE) , checkRangeDelimiter_(FALSE) + , dataModCheckDone_(FALSE) { Space * space = (glob ? glob->getSpace() : 0); CollHeap * heap = (glob ? glob->getDefaultHeap() : 0); @@ -394,7 +395,7 @@ ExWorkProcRetcode ExHdfsScanTcb::work() if (hdfsScanTdb().getHdfsFileInfoList()->isEmpty()) { - step_ = DONE; + step_ = CHECK_FOR_DATA_MOD_AND_DONE; break; } @@ -410,16 +411,76 @@ ExWorkProcRetcode ExHdfsScanTcb::work() hdfsScanBufMaxSize_ = hdfsScanTdb().hdfsBufSize_; + dataModCheckDone_ = FALSE; + if (numRanges_ > 0) - step_ = INIT_HDFS_CURSOR; + step_ = CHECK_FOR_DATA_MOD; else - step_ = DONE; + step_ = CHECK_FOR_DATA_MOD_AND_DONE; } break; + case CHECK_FOR_DATA_MOD: + case CHECK_FOR_DATA_MOD_AND_DONE: + { + char * dirPath = hdfsScanTdb().hdfsFilesDir_; + if (! dirPath) + dataModCheckDone_ = TRUE; + + if (NOT dataModCheckDone_) + { + Int64 modTS = hdfsScanTdb().modTSforDir_; + Lng32 numFilesInDir = hdfsScanTdb().numFilesInDir_; + + retcode = ExpLOBinterfaceDataModCheck + (lobGlob_, + dirPath, + hdfsScanTdb().hostName_, + hdfsScanTdb().port_, + modTS, + numFilesInDir); + + if (retcode < 0) + { + Lng32 cliError = 0; + + Lng32 intParam1 = -retcode; + ComDiagsArea * diagsArea = NULL; + ExRaiseSqlError(getHeap(), &diagsArea, + (ExeErrorCode)(EXE_ERROR_FROM_LOB_INTERFACE), + NULL, &intParam1, + &cliError, + NULL, + "HDFS", + (char*)"ExpLOBInterfaceDataModCheck", + getLobErrStr(intParam1)); + pentry_down->setDiagsArea(diagsArea); + step_ = HANDLE_ERROR_AND_DONE; + break; + } + + if (retcode == 1) // check failed + { + ComDiagsArea * diagsArea = NULL; + ExRaiseSqlError(getHeap(), &diagsArea, + (ExeErrorCode)(8436)); + pentry_down->setDiagsArea(diagsArea); + step_ = HANDLE_ERROR_AND_DONE; + break; + } + + dataModCheckDone_ = TRUE; + } + + if (step_ == CHECK_FOR_DATA_MOD_AND_DONE) + step_ = DONE; + else + step_ = INIT_HDFS_CURSOR; + } + break; + case INIT_HDFS_CURSOR: { - hdfo_ = (HdfsFileInfo*) hdfsScanTdb().getHdfsFileInfoList()->get(currRangeNum_); if ((hdfo_->getBytesToRead() == 0) && @@ -569,10 +630,11 @@ ExWorkProcRetcode ExHdfsScanTcb::work() trailingPrevRead_ = 0; firstBufOfFile_ = true; numBytesProcessedInRange_ = 0; + step_ = GET_HDFS_DATA; } break; - + case GET_HDFS_DATA: { Int64 bytesToRead = hdfsScanBufMaxSize_ - trailingPrevRead_; @@ -1228,8 +1290,10 @@ ExWorkProcRetcode ExHdfsScanTcb::work() workAtp_->getDiagsArea()->clear(); } break; + case HANDLE_ERROR_WITH_CLOSE: case HANDLE_ERROR: + case HANDLE_ERROR_AND_DONE: { if (qparent_.up->isFull()) return WORK_OK; @@ -1258,6 +1322,8 @@ ExWorkProcRetcode ExHdfsScanTcb::work() if (step_ == HANDLE_ERROR_WITH_CLOSE) step_ = CLOSE_HDFS_CURSOR; + else if (step_ == HANDLE_ERROR_AND_DONE) + step_ = DONE; else step_ = ERROR_CLOSE_FILE; break; http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f4728220/core/sql/executor/ExHdfsScan.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHdfsScan.h b/core/sql/executor/ExHdfsScan.h index 8cd4690..453596e 100644 --- a/core/sql/executor/ExHdfsScan.h +++ b/core/sql/executor/ExHdfsScan.h @@ -162,18 +162,21 @@ protected: NOT_STARTED , INIT_HDFS_CURSOR , OPEN_HDFS_CURSOR + , CHECK_FOR_DATA_MOD + , CHECK_FOR_DATA_MOD_AND_DONE , GET_HDFS_DATA , CLOSE_HDFS_CURSOR , PROCESS_HDFS_ROW , RETURN_ROW , REPOS_HDFS_DATA - ,CLOSE_FILE - ,ERROR_CLOSE_FILE - ,COLLECT_STATS + , CLOSE_FILE + , ERROR_CLOSE_FILE + , COLLECT_STATS , HANDLE_ERROR - ,HANDLE_EXCEPTION + , HANDLE_EXCEPTION , DONE , HANDLE_ERROR_WITH_CLOSE + , HANDLE_ERROR_AND_DONE } step_,nextStep_; ///////////////////////////////////////////////////// @@ -287,6 +290,8 @@ protected: NABoolean exception_; ComCondition * lastErrorCnd_; NABoolean checkRangeDelimiter_; + + NABoolean dataModCheckDone_; }; class ExOrcScanTcb : public ExHdfsScanTcb http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f4728220/core/sql/exp/ExpLOBaccess.cpp ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOBaccess.cpp b/core/sql/exp/ExpLOBaccess.cpp index 929db63..5c1d2fa 100644 --- a/core/sql/exp/ExpLOBaccess.cpp +++ b/core/sql/exp/ExpLOBaccess.cpp @@ -131,11 +131,11 @@ Ex_Lob_Error ExLob::initialize(char *lobFile, Ex_Lob_Mode mode, dir_ = string(dir); } - - snprintf(lobDataFile_, MAX_LOB_FILE_NAME_LEN, "%s/%s", dir_.c_str(), lobFile); + if (lobFile) + snprintf(lobDataFile_, MAX_LOB_FILE_NAME_LEN, "%s/%s", dir_.c_str(), lobFile); } - else + else if (lobFile) { snprintf(lobDataFile_, MAX_LOB_FILE_NAME_LEN, "%s", lobFile); @@ -153,7 +153,8 @@ Ex_Lob_Error ExLob::initialize(char *lobFile, Ex_Lob_Mode mode, hdfsServer_ = hdfsServer; hdfsPort_ = hdfsPort; - lobLocation_ = lobLocation; + if (lobLocation) + lobLocation_ = lobLocation; clock_gettime(CLOCK_MONOTONIC, &startTime); if (lobGlobals->getHdfsFs() == NULL) @@ -377,6 +378,62 @@ Ex_Lob_Error ExLob::writeDataSimple(char *data, Int64 size, LobsSubOper subOpera return LOB_OPER_OK; } + +Ex_Lob_Error ExLob::dataModCheck( + char * dirPath, + Int64 inputModTS, + Lng32 inputNumFilesInDir, + Lng32 &numFilesInDir) +{ + // find mod time of dir + hdfsFileInfo *fileInfos = hdfsGetPathInfo(fs_, dirPath); + if (fileInfos == NULL) + { + return LOB_DATA_FILE_NOT_FOUND_ERROR; + } + + Int64 currModTS = fileInfos[0].mLastMod; + hdfsFreeFileInfo(fileInfos, 1); + if ((inputModTS > 0) && + (currModTS > inputModTS)) + return LOB_DATA_MOD_CHECK_ERROR; + + // find number of files in dirPath. + Lng32 currNumFilesInDir = 0; + fileInfos = hdfsListDirectory(fs_, dirPath, &currNumFilesInDir); + if ((currNumFilesInDir > 0) && (fileInfos == NULL)) + { + return LOB_DATA_FILE_NOT_FOUND_ERROR; + } + + NABoolean failed = FALSE; + for (Lng32 i = 0; ((NOT failed) && (i < currNumFilesInDir)); i++) + { + hdfsFileInfo &fileInfo = fileInfos[i]; + if (fileInfo.mKind == kObjectKindDirectory) + { + if (dataModCheck(fileInfo.mName, inputModTS, + inputNumFilesInDir, numFilesInDir) == + LOB_DATA_MOD_CHECK_ERROR) + { + failed = TRUE; + } + } + else if (fileInfo.mKind == kObjectKindFile) + { + numFilesInDir++; + if (numFilesInDir > inputNumFilesInDir) + failed = TRUE; + } + } + + hdfsFreeFileInfo(fileInfos, currNumFilesInDir); + if (failed) + return LOB_DATA_MOD_CHECK_ERROR; + + return LOB_OPER_OK; +} + Ex_Lob_Error ExLob::emptyDirectory() { Ex_Lob_Error err; @@ -2040,8 +2097,8 @@ Ex_Lob_Error ExLobsOper ( LobsStorage storage, // storage type char *source, // source (memory addr, filename, foreign lob etc) Int64 sourceLen, // source len (memory len, foreign desc offset etc) - Int64 cursorBytes, - char *cursorId, + Int64 cursorBytes, + char *cursorId, LobsOper operation, // LOB operation LobsSubOper subOperation, // LOB sub operation Int64 waited, // waited or nowaited @@ -2315,7 +2372,6 @@ Ex_Lob_Error ExLobsOper ( lobDebugInfo("purgeLob failed ",err,__LINE__,lobGlobals->lobTrace_); break; - case Lob_Stats: err = lobPtr->readStats(source); lobPtr->initStats(); // because file may remain open across cursors @@ -2323,10 +2379,28 @@ Ex_Lob_Error ExLobsOper ( case Lob_Empty_Directory: lobPtr->initialize(fileName, EX_LOB_RW, - dir, storage, hdfsServer, hdfsPort, dir,bufferSize, replication, blockSize); + dir, storage, hdfsServer, hdfsPort, dir, bufferSize, replication, blockSize); err = lobPtr->emptyDirectory(); break; + case Lob_Data_Mod_Check: + { + lobPtr->initialize(NULL, EX_LOB_RW, + NULL, storage, hdfsServer, hdfsPort, NULL, + bufferSize, replication, blockSize); + + Int64 inputModTS = *(Int64*)blackBox; + Int32 inputNumFilesInDir = + *(Lng32*)&((char*)blackBox)[sizeof(inputModTS)]; + Int32 numFilesInDir = 0; + err = lobPtr->dataModCheck(dir, inputModTS, + inputNumFilesInDir, numFilesInDir); + if ((err == LOB_OPER_OK) && + (numFilesInDir != inputNumFilesInDir)) + err = LOB_DATA_MOD_CHECK_ERROR; + } + break; + case Lob_Cleanup: delete lobGlobals; break; http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f4728220/core/sql/exp/ExpLOBaccess.h ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOBaccess.h b/core/sql/exp/ExpLOBaccess.h index 452f769..138116c 100644 --- a/core/sql/exp/ExpLOBaccess.h +++ b/core/sql/exp/ExpLOBaccess.h @@ -436,41 +436,41 @@ class ExLob Ex_Lob_Error insertData(char *data, Int64 size, LobsSubOper so,Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemSize,char *handleIn,Int32 handleInLen, char *blackBox, Int32 blackBoxLen, char * handleOut, Int32 &handleOutLen, void *lobGlobals); Ex_Lob_Error append(char *data, Int64 size, LobsSubOper so, Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemLen,Int64 lobGCLimit, char *handleIn,Int32 handleInLen, char * handleOut, Int32 &handleOutLen, void *lobGlobals); Ex_Lob_Error update(char *data, Int64 size, LobsSubOper so,Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize,Int64 lobMaxChunkMemLen,Int64 lobGCLimit,char *handleIn,Int32 handleInLen, char * handleOut, Int32 &handleOutLen, void *lobGlobals); - Ex_Lob_Error readSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset); - Ex_Lob_Error readHdfsSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset); - Ex_Lob_Error readLocalSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset); - Ex_Lob_Error readExternalSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset); - Ex_Lob_Error statSourceFile(char *srcfile, Int64 &sourceEOF); - Ex_Lob_Error delDesc(char *handleIn, Int32 handleInLen, Int64 transId); - Ex_Lob_Error purgeLob(); - Ex_Lob_Error closeFile(); - LobInputOutputFileType fileType(char *ioFileName); - Ex_Lob_Error closeCursor(char *handleIn, Int32 handleInLen); - Ex_Lob_Error closeDataCursorSimple(char *fileName, ExLobGlobals *lobGlobals); - - Ex_Lob_Error doSanityChecks(char *dir, LobsStorage storage, - Int32 handleInLen, Int32 handleOutLen, - Int32 blackBoxLen); + Ex_Lob_Error readSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset); + Ex_Lob_Error readHdfsSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset); + Ex_Lob_Error readLocalSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset); + Ex_Lob_Error readExternalSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset); + Ex_Lob_Error statSourceFile(char *srcfile, Int64 &sourceEOF); + Ex_Lob_Error delDesc(char *handleIn, Int32 handleInLen, Int64 transId); + Ex_Lob_Error purgeLob(); + Ex_Lob_Error closeFile(); + LobInputOutputFileType fileType(char *ioFileName); + Ex_Lob_Error closeCursor(char *handleIn, Int32 handleInLen); + Ex_Lob_Error closeDataCursorSimple(char *fileName, ExLobGlobals *lobGlobals); + + Ex_Lob_Error doSanityChecks(char *dir, LobsStorage storage, + Int32 handleInLen, Int32 handleOutLen, + Int32 blackBoxLen); Ex_Lob_Error allocateDesc(unsigned int size, Int64 &descNum, Int64 &dataOffset,Int64 lobMaxSize,Int64 lobMaxChunkMemSize, char *handleIn, Int32 handleInLen,Int64 lobGCLimit, void *lobGlobals); - Ex_Lob_Error readStats(char *buffer); - Ex_Lob_Error initStats(); - - Ex_Lob_Error insertDesc(Int64 offset, Int64 size, char *handleIn, Int32 handleInLen, char *handleOut, Int32 &handleOutLen, char *blackBox, Int32 blackBoxLen,void *lobGlobals) ; - - Ex_Lob_Error lockDesc(); - Ex_Lob_Error unlockDesc(); - char *getDataFileName() { return lobDataFile_; } - - int getErrNo(); - + Ex_Lob_Error readStats(char *buffer); + Ex_Lob_Error initStats(); - Ex_Lob_Error getDesc(ExLobDesc &desc,char * handleIn, Int32 handleInLen, char *blackBox, Int32 *blackBoxLen, char * handleOut, Int32 &handleOutLen, Int64 transId); - - Ex_Lob_Error writeData(Int64 offset, char *data, Int32 size, Int64 &operLen); + Ex_Lob_Error insertDesc(Int64 offset, Int64 size, char *handleIn, Int32 handleInLen, char *handleOut, Int32 &handleOutLen, char *blackBox, Int32 blackBoxLen,void *lobGlobals) ; + + Ex_Lob_Error lockDesc(); + Ex_Lob_Error unlockDesc(); + char *getDataFileName() { return lobDataFile_; } + + int getErrNo(); + + + Ex_Lob_Error getDesc(ExLobDesc &desc,char * handleIn, Int32 handleInLen, char *blackBox, Int32 *blackBoxLen, char * handleOut, Int32 &handleOutLen, Int64 transId); + + Ex_Lob_Error writeData(Int64 offset, char *data, Int32 size, Int64 &operLen); Ex_Lob_Error readDataToMem(char *memAddr, Int64 offset, Int64 size, Int64 &operLen,char *handleIn, Int32 handleLenIn, NABoolean multipleChunks, Int64 transId); - + Ex_Lob_Error readDataToLocalFile(char *fileName, Int64 offset, Int64 size,Int64 &operLen,Int64 lobMaxChunkMemLen ,Int32 fileFlags,char *handleIn,Int32 handleInLen, NABoolean multipleChunks,Int64 transId); Ex_Lob_Error readDataToHdfsFile(char *fileName, Int64 offset, Int64 size, Int64 &operLen,Int64 lobMaxChunkMemLen, Int32 fileflags,char *handleIn,Int32 handleInLen, NABoolean multipleChunks,Int64 transId); Ex_Lob_Error readDataToExternalFile(char *tgtFileName, Int64 offset, Int64 size, Int64 &operLen, Int64 lobMaxChunkMemLen, Int32 fileflags,char *handleIn,Int32 handleInLen, NABoolean multipleChunks,Int64 transId); @@ -479,9 +479,22 @@ class ExLob Ex_Lob_Error restoreLobDataFile(); Ex_Lob_Error purgeBackupLobDataFile(); - Ex_Lob_Error emptyDirectory(); - ExLobStats *getStats() { return &stats_; } - NAHeap *getLobGlobalHeap() { return lobGlobalHeap_;} + // dirPath: path to needed directory (includes directory name) + // modTS is the latest timestamp on any file/dir under dirPath. + // numFilesInDir is the total number of files under dirPath. + // This method validates that current modTS is not greater then input modTS + // and current number of files in dirPath are the same as input numFilesInDir. + // If either condition is not true, then check fails. + // Return: LOB_OPER_OK, if passes. LOB_DATA_MOD_CHECK_ERROR, if fails. + Ex_Lob_Error dataModCheck( + char * dirPath, + Int64 modTS, + Lng32 inputNumFilesInDir, + Lng32 &numFilesInDir); + + Ex_Lob_Error emptyDirectory(); + ExLobStats *getStats() { return &stats_; } + NAHeap *getLobGlobalHeap() { return lobGlobalHeap_;} ExLobRequest *getRequest() { return &request_; } //The next 2 functions are not active at this point. They serve as an example http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f4728220/core/sql/exp/ExpLOBenums.h ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOBenums.h b/core/sql/exp/ExpLOBenums.h index 99e072d..12647be 100644 --- a/core/sql/exp/ExpLOBenums.h +++ b/core/sql/exp/ExpLOBenums.h @@ -92,6 +92,7 @@ typedef enum { LOB_INVALID_ERROR_VAL, LOB_MAX_LIMIT_ERROR = 560, LOB_TARGET_FILE_EXISTS_ERROR, + LOB_DATA_MOD_CHECK_ERROR, LOB_MAX_ERROR_NUM // keep this as the last element in enum list. } Ex_Lob_Error; @@ -159,6 +160,7 @@ static const char * const lobErrorEnumStr[] = "LOB_INVALID_ERROR_VAL", "LOB_MAX_LIMIT_ERROR", //560 "LOB_TGT_FILE_EXISTS_ERROR", + "LOB_DATA_MOD_CHECK_ERROR", "LOB_MAX_ERROR_NUM" // keep this as the last element in enum list. }; @@ -236,6 +238,7 @@ typedef enum { Lob_Print, // debugging purposes Lob_Empty_Directory, + Lob_Data_Mod_Check, Lob_Cleanup, // destroy everything under globals Lob_PerformGC, http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f4728220/core/sql/exp/ExpLOBinterface.cpp ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOBinterface.cpp b/core/sql/exp/ExpLOBinterface.cpp index 23a2083..a984635 100644 --- a/core/sql/exp/ExpLOBinterface.cpp +++ b/core/sql/exp/ExpLOBinterface.cpp @@ -220,7 +220,6 @@ Lng32 ExpLOBinterfaceCreate( bufferSize , replication, blockSize - ); if (err != LOB_OPER_OK) @@ -229,6 +228,51 @@ Lng32 ExpLOBinterfaceCreate( return 0; } +// Return: 1, if check fails. +// 0, if check passes. +// -LOB_*_ERROR, if error. +Lng32 ExpLOBinterfaceDataModCheck(void * lobGlob, + char * dirPath, + char * lobHdfsServer, + Lng32 lobHdfsPort, + Int64 modTS, + Lng32 numFilesInDir) +{ + Ex_Lob_Error err; + + Int64 dummyParam=0; + Int32 dummyParam2 = 0; + Ex_Lob_Error status; + Int64 cliError = -1; + + char dirInfoBuf[100]; + *(Int64*)dirInfoBuf = modTS; + *(Lng32*)&dirInfoBuf[sizeof(modTS)] = numFilesInDir; + Lng32 dirInfoBufLen = sizeof(modTS) + sizeof(numFilesInDir); + err = ExLobsOper((char*)"", + NULL, 0, + lobHdfsServer, lobHdfsPort, + NULL, dummyParam2, 0, dummyParam, + dummyParam, 0, dummyParam, status, cliError, + dirPath, (LobsStorage)Lob_HDFS_File, + NULL, 0, + 0,NULL, + Lob_Data_Mod_Check, + Lob_None, + 1, // waited op + lobGlob, + 0, + dirInfoBuf, dirInfoBufLen + ); + + if (err == LOB_DATA_MOD_CHECK_ERROR) + return 1; + else if (err != LOB_OPER_OK) + return -(short)err; + else + return 0; +} + Lng32 ExpLOBinterfaceEmptyDirectory( void * lobGlob, char * lobName, http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f4728220/core/sql/exp/ExpLOBinterface.h ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOBinterface.h b/core/sql/exp/ExpLOBinterface.h index 26f2ad4..a9b7597 100644 --- a/core/sql/exp/ExpLOBinterface.h +++ b/core/sql/exp/ExpLOBinterface.h @@ -291,6 +291,19 @@ Lng32 ExpLOBinterfacePerformGC(void *& lobGlob, char *lobName,void *descChunksAr Lng32 ExpLOBinterfaceRestoreLobDataFile(void *& lobGlob, char *hdfsServer, Int32 hdfsPort,char *lobLoc,char *lobName); Lng32 ExpLOBinterfacePurgeBackupLobDataFile(void *& lobGlob, char *hdfsServer, Int32 hdfsPort,char *lobLoc,char *lobName); +// dirPath: path to needed directory (includes directory name) +// modTS is the latest timestamp on any file/dir under dirPath. +// numFilesInDir is the total number of files under dirPath. +// This method validates that current modTS is not greater then input modTS +// and current number of files in dirPath are the same as input numFilesInDir. +// If either condition is not true, then check fails. +// Return: 1, if check fails. 0, if passes. -1, if error. +Lng32 ExpLOBinterfaceDataModCheck(void * lobGlob, + char * dirPath, + char * lobHdfsServer, + Lng32 lobHdfsPort, + Int64 modTS, + Lng32 numFilesInDir); Lng32 ExpLOBinterfaceEmptyDirectory(void * lobGlob, char * lobName, http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f4728220/core/sql/generator/GenRelScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/generator/GenRelScan.cpp b/core/sql/generator/GenRelScan.cpp index 827ed90..a781815 100644 --- a/core/sql/generator/GenRelScan.cpp +++ b/core/sql/generator/GenRelScan.cpp @@ -360,29 +360,16 @@ short FileScan::genForTextAndSeq(Generator * generator, const NABoolean isSequenceFile = hTabStats->isSequenceFile(); HiveFileIterator hfi; - NABoolean firstFile = TRUE; hdfsPort = 0; hdfsHostName = NULL; - while (firstFile && getHiveSearchKey()->getNextFile(hfi)) - { - const HHDFSFileStats * hFileStats = hfi.getFileStats(); - if (firstFile) - { - // determine connection info (host and port) from the first file - NAString dummy, hostName; - NABoolean result; - result = ((HHDFSTableStats*)hTabStats)->splitLocation - (hFileStats->getFileName().data(), hostName, hdfsPort, dummy) ; - - GenAssert(result, "Invalid Hive directory name"); - - hdfsHostName = - space->AllocateAndCopyToAlignedSpace(hostName, 0); - - firstFile = FALSE; - } - } + // determine host and port from dir name + NAString dummy, hostName; + NABoolean result = ((HHDFSTableStats*)hTabStats)->splitLocation + (hTabStats->tableDir().data(), hostName, hdfsPort, dummy) ; + GenAssert(result, "Invalid Hive directory name"); + hdfsHostName = + space->AllocateAndCopyToAlignedSpace(hostName, 0); hdfsFileInfoList = new(space) Queue(space); hdfsFileRangeBeginList = new(space) Queue(space); @@ -1159,6 +1146,22 @@ if (hTabStats->isOrcFile()) char * tablename = space->AllocateAndCopyToAlignedSpace(GenGetQualifiedName(getIndexDesc()->getNAFileSet()->getFileSetName()), 0); + // info needed to validate hdfs file structs + // const HHDFSTableStats* hTabStats = + // getIndexDesc()->getNAFileSet()->getHHDFSTableStats(); + char * hdfsDir = NULL; + Int64 modTS = -1; + Lng32 numFilesInDir = -1; + if (CmpCommon::getDefault(HIVE_DATA_MOD_CHECK) == DF_ON) + { + hdfsDir = + space->allocateAndCopyToAlignedSpace(hTabStats->tableDir().data(), + hTabStats->tableDir().length(), + 0); + modTS = hTabStats->getModificationTS(); + numFilesInDir = hTabStats->getNumFiles(); + } + // create hdfsscan_tdb ComTdbHdfsScan *hdfsscan_tdb = new(space) ComTdbHdfsScan( @@ -1197,7 +1200,9 @@ if (hTabStats->isOrcFile()) buffersize, errCountTab, logLocation, - errCountRowId + errCountRowId, + + hdfsDir, modTS, numFilesInDir ); generator->initTdbFields(hdfsscan_tdb); http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f4728220/core/sql/optimizer/HDFSHook.cpp ---------------------------------------------------------------------- diff --git a/core/sql/optimizer/HDFSHook.cpp b/core/sql/optimizer/HDFSHook.cpp index 7a6e86d..90df234 100644 --- a/core/sql/optimizer/HDFSHook.cpp +++ b/core/sql/optimizer/HDFSHook.cpp @@ -559,6 +559,8 @@ void HHDFSListPartitionStats::populate(hdfsFS fs, } else { + dirInfo_ = *dirInfo; + // list all the files in this directory, they all belong // to this partition and either belong to a specific bucket // or to the default bucket @@ -1008,6 +1010,9 @@ void HHDFSTableStats::processDirectory(const NAString &dir, Int32 numOfBuckets, totalNumPartitions_++; // aggregate stats add(partStats); + + if (partStats->dirInfo()->mLastMod > modificationTS_) + modificationTS_ = partStats->dirInfo()->mLastMod; } } http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f4728220/core/sql/optimizer/HDFSHook.h ---------------------------------------------------------------------- diff --git a/core/sql/optimizer/HDFSHook.h b/core/sql/optimizer/HDFSHook.h index cbe634c..1ab474c 100644 --- a/core/sql/optimizer/HDFSHook.h +++ b/core/sql/optimizer/HDFSHook.h @@ -223,6 +223,8 @@ public: Int32 getNumOfBuckets() const { return (defaultBucketIdx_ ? defaultBucketIdx_ : 1); } Int32 getLastValidBucketIndx() const { return defaultBucketIdx_; } + const hdfsFileInfo * dirInfo() const {return &dirInfo_; } + void populate(hdfsFS fs, const NAString &dir, Int32 numOfBuckets, HHDFSDiags &diags, NABoolean doEsTimation, char recordTerminator); @@ -246,6 +248,8 @@ private: NABoolean doEstimation_; char recordTerminator_; + hdfsFileInfo dirInfo_; + NAMemory *heap_; }; http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f4728220/core/sql/regress/executor/EXPECTED020 ---------------------------------------------------------------------- diff --git a/core/sql/regress/executor/EXPECTED020 b/core/sql/regress/executor/EXPECTED020 index 5d2f932..5edccba 100644 --- a/core/sql/regress/executor/EXPECTED020 +++ b/core/sql/regress/executor/EXPECTED020 @@ -105,6 +105,7 @@ A B 4023 0 1 0 1 4039 0 1 0 1 4419 0 1 0 4 + 8436 0 1 0 0 8550 30 1 60 0 8550 31 1 60 0 8550 33 1 60 0 @@ -172,6 +173,7 @@ A B 4023 0 1 0 1 4039 0 1 0 1 4419 0 1 0 4 + 8436 0 1 0 0 8550 30 1 60 0 8550 31 1 60 0 8550 33 1 60 0 @@ -238,6 +240,7 @@ A B 4023 0 1 0 1 4039 0 1 0 1 4419 0 1 0 4 + 8436 0 1 0 0 8550 30 1 60 0 8550 31 1 60 0 8550 33 1 60 0 @@ -305,6 +308,7 @@ A B 4023 0 1 0 1 4039 0 1 0 1 4419 0 1 0 4 + 8436 0 1 0 0 8550 30 1 60 0 8550 31 1 60 0 8550 33 1 60 0 http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f4728220/core/sql/sqlcomp/DefaultConstants.h ---------------------------------------------------------------------- diff --git a/core/sql/sqlcomp/DefaultConstants.h b/core/sql/sqlcomp/DefaultConstants.h index e8cf41d..a48f1c8 100644 --- a/core/sql/sqlcomp/DefaultConstants.h +++ b/core/sql/sqlcomp/DefaultConstants.h @@ -3817,6 +3817,10 @@ enum DefaultConstants // // 2 : todo HIVE_SCAN_SPECIAL_MODE, + // if set, data modification check is done at runtime before running + // a query. + HIVE_DATA_MOD_CHECK, + // This enum constant must be the LAST one in the list; it's a count, // not an Attribute (it's not IN DefaultDefaults; it's the SIZE of it)! __NUM_DEFAULT_ATTRIBUTES http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f4728220/core/sql/sqlcomp/nadefaults.cpp ---------------------------------------------------------------------- diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp index 1eb05cf..8f5acbe 100644 --- a/core/sql/sqlcomp/nadefaults.cpp +++ b/core/sql/sqlcomp/nadefaults.cpp @@ -1957,6 +1957,8 @@ SDDkwd__(EXE_DIAGNOSTIC_EVENTS, "OFF"), DD_____(HIVE_CATALOG, ""), + DDkwd__(HIVE_DATA_MOD_CHECK, "ON"), + DDkwd__(HIVE_DEFAULT_CHARSET, (char *)SQLCHARSETSTRING_UTF8), DD_____(HIVE_DEFAULT_SCHEMA, "HIVE"), DD_____(HIVE_FILE_CHARSET, ""),
