Repository: trafodion Updated Branches: refs/heads/master dc1a619ae -> 4fdc74241
Support to provide a locking mechanism for LOB insert/update operations Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/b041abd9 Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/b041abd9 Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/b041abd9 Branch: refs/heads/master Commit: b041abd941bb1c8aabd78eccbae73695ea1cf8f3 Parents: 2477de3 Author: Sandhya Sundaresan <[email protected]> Authored: Wed May 30 18:55:46 2018 +0000 Committer: Sandhya Sundaresan <[email protected]> Committed: Wed May 30 18:55:46 2018 +0000 ---------------------------------------------------------------------- core/sql/bin/SqlciErrors.txt | 1 + core/sql/cli/Cli.cpp | 15 ++++ core/sql/cli/Cli.h | 8 +- core/sql/cli/CliExtern.cpp | 122 ++++++++++++++++++++++++++++++ core/sql/cli/Context.cpp | 53 +++++++++++++ core/sql/cli/Context.h | 6 +- core/sql/cli/SessionDefaults.cpp | 1 + core/sql/comexe/ComTdbExeUtil.h | 9 ++- core/sql/executor/ExExeUtil.h | 1 + core/sql/executor/ExExeUtilLoad.cpp | 70 +++++++++++++++-- core/sql/exp/ExpErrorEnums.h | 1 + core/sql/exp/ExpLOB.cpp | 113 +++++++++++++++++++++++---- core/sql/exp/ExpLOB.h | 15 +++- core/sql/exp/ExpLOBenums.h | 2 +- core/sql/exp/ExpLOBexternal.h | 4 +- core/sql/generator/GenItemFunc.cpp | 10 ++- core/sql/generator/GenPreCode.cpp | 4 +- core/sql/generator/GenRelExeUtil.cpp | 6 ++ core/sql/parser/sqlparser.y | 4 +- core/sql/runtimestats/SqlStats.cpp | 22 +++++- core/sql/runtimestats/SqlStats.h | 3 + core/sql/runtimestats/rts_msg.cpp | 39 ++++++++++ core/sql/runtimestats/rts_msg.h | 40 +++++++++- core/sql/runtimestats/sscpipc.cpp | 53 +++++++++++++ core/sql/runtimestats/sscpipc.h | 1 + core/sql/runtimestats/ssmpipc.cpp | 52 +++++++++++++ core/sql/runtimestats/ssmpipc.h | 5 +- core/sql/sqlcomp/DefaultConstants.h | 1 + core/sql/sqlcomp/nadefaults.cpp | 5 ++ 29 files changed, 628 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/bin/SqlciErrors.txt ---------------------------------------------------------------------- diff --git a/core/sql/bin/SqlciErrors.txt b/core/sql/bin/SqlciErrors.txt index 8146be4..3050094 100644 --- a/core/sql/bin/SqlciErrors.txt +++ b/core/sql/bin/SqlciErrors.txt @@ -1597,6 +1597,7 @@ $1~String1 -------------------------------- 8555 ZZZZZ 99999 ADVANCED CRTCL DIALOUT An internal error occurred in the SQL executor in the disk process. 8556 ZZZZZ 99999 BEGINNER MAJOR DBADMIN An error occurred while accessing HBase table $0~string0. $1~string1 8557 ZZZZZ 99999 BEGINNER MAJOR DBADMIN The file name passed to externaltolob exceeds 256 bytes. +8558 ZZZZZ 99999 BEGINNER MAJOR DBADMIN The LOB operation is prevented due to conflicting access $0~string0. 8570 ZZZZZ 99999 ADVANCED MAJOR DBADMIN SQL could not allocate sufficient memory to build query. 8571 ZZZZZ 99999 ADVANCED MAJOR DBADMIN SQL could not allocate sufficient memory to execute query. 8572 ZZZZZ 99999 ADVANCED CRTCL DIALOUT The statement has incurred a fatal error and must be deallocated. http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/cli/Cli.cpp ---------------------------------------------------------------------- diff --git a/core/sql/cli/Cli.cpp b/core/sql/cli/Cli.cpp index a06aadb..a942bca 100644 --- a/core/sql/cli/Cli.cpp +++ b/core/sql/cli/Cli.cpp @@ -8088,6 +8088,21 @@ Lng32 SQLCLI_GetSecInvalidKeys(CliGlobals *cliGlobals, return retcode; } +Lng32 SQLCLI_SetLobLock(CliGlobals *cliGlobals, + /* IN */ char *lobLockId + ) +{ + return cliGlobals->currContext()->setLobLock(lobLockId); +} +Lng32 SQLCLI_CheckLobLock(CliGlobals *cliGlobals, + /* IN */ char *lobLockId, + /*OUT */ NABoolean *found + ) +{ + Int32 retcode = 0; + retcode = cliGlobals->currContext()->checkLobLock(lobLockId, found); + return retcode; +} Lng32 SQLCLI_GetStatistics2(CliGlobals *cliGlobals, /* IN */ short statsReqType, /* IN */ char *statsReqStr, http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/cli/Cli.h ---------------------------------------------------------------------- diff --git a/core/sql/cli/Cli.h b/core/sql/cli/Cli.h index bc8ae16..a8a6b9a 100644 --- a/core/sql/cli/Cli.h +++ b/core/sql/cli/Cli.h @@ -763,7 +763,13 @@ Lng32 SQLCLI_GetSecInvalidKeys(CliGlobals *cliGlobals, /* IN/OUT */ Int32 *returnedNumSiKeys, /* IN/OUT */ Int64 *maxTimestamp); - +Lng32 SQLCLI_SetLobLock(CliGlobals *cliGlobals, + /* IN */ char * lobLockId + ); +Lng32 SQLCLI_CheckLobLock(CliGlobals *cliGlobals, + /* IN */ char *lobLockId, + /*OUT */ NABoolean *found + ); Lng32 SQLCLI_GetStatistics2(CliGlobals *cliGlobals, /* IN */ short statsReqType, /* IN */ char *statsReqStr, http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/cli/CliExtern.cpp ---------------------------------------------------------------------- diff --git a/core/sql/cli/CliExtern.cpp b/core/sql/cli/CliExtern.cpp index 38a766c..69e4717 100644 --- a/core/sql/cli/CliExtern.cpp +++ b/core/sql/cli/CliExtern.cpp @@ -87,6 +87,7 @@ #include "Context.h" #include <unistd.h> #include "QRLogger.h" +#include "ExpLOBenums.h" extern char ** environ; @@ -6122,6 +6123,7 @@ Lng32 SQL_EXEC_SetSecInvalidKeys( return retcode; } + Lng32 SQL_EXEC_GetSecInvalidKeys( /* IN */ Int64 prevTimestamp, /* IN/OUT */ SQL_QIKEY siKeys[], @@ -6165,6 +6167,126 @@ Lng32 SQL_EXEC_GetSecInvalidKeys( return retcode; } +Lng32 SQL_EXEC_SetLobLock(/* IN */ char *llid) +{ + + Lng32 retcode = 0; + if (!llid || strlen(llid) == 0 ) + return retcode; + CLISemaphore *tmpSemaphore = NULL; + ContextCli *threadContext; + CLI_NONPRIV_PROLOGUE(retcode); + try + { + tmpSemaphore = getCliSemaphore(threadContext); + tmpSemaphore->get(); + threadContext->incrNumOfCliCalls(); + char llidAdd[LOB_LOCK_ID_SIZE+1]; + // Prepend a '+' to indicate we are setting a new lock in the + // shared segement + llidAdd[0] = '+'; + memcpy(&llidAdd[1],llid,LOB_LOCK_ID_SIZE); + retcode = SQLCLI_SetLobLock(GetCliGlobals(), + (char *)llidAdd); + } + catch(...) + { + retcode = -CLI_INTERNAL_ERROR; +#if defined(_THROW_EXCEPTIONS) + if (cliWillThrow()) + { + threadContext->decrNumOfCliCalls(); + tmpSemaphore->release(); + throw; + } +#endif + } + threadContext->decrNumOfCliCalls(); + tmpSemaphore->release(); + + + RecordError(NULL, retcode); + return retcode; +} + +Lng32 SQL_EXEC_ReleaseLobLock(/* IN */ char *llid) +{ + Lng32 retcode = 0; + if (!llid || strlen(llid) ==0 ) + return retcode; + CLISemaphore *tmpSemaphore = NULL; + ContextCli *threadContext; + CLI_NONPRIV_PROLOGUE(retcode); + try + { + tmpSemaphore = getCliSemaphore(threadContext); + tmpSemaphore->get(); + threadContext->incrNumOfCliCalls(); + char llidDel[LOB_LOCK_ID_SIZE+1]; + // Prepend a '-' to indicate we are removing this lock from the + // shared segement + llidDel[0] = '-'; + memcpy(&llidDel[1],llid,LOB_LOCK_ID_SIZE); + retcode = SQLCLI_SetLobLock(GetCliGlobals(), + (char *)llidDel); + } + catch(...) + { + retcode = -CLI_INTERNAL_ERROR; +#if defined(_THROW_EXCEPTIONS) + if (cliWillThrow()) + { + threadContext->decrNumOfCliCalls(); + tmpSemaphore->release(); + throw; + } +#endif + } + threadContext->decrNumOfCliCalls(); + tmpSemaphore->release(); + + + RecordError(NULL, retcode); + return retcode; +} +Lng32 SQL_EXEC_CheckLobLock(/* IN */ char * llid, /* IN */ NABoolean *found) +{ + Lng32 retcode=0; + if (!llid || (strlen(llid)==0)) + { + *found = FALSE; + return retcode; + } + CLISemaphore *tmpSemaphore = NULL; + ContextCli *threadContext; + CLI_NONPRIV_PROLOGUE(retcode); + try + { + tmpSemaphore = getCliSemaphore(threadContext); + tmpSemaphore->get(); + threadContext->incrNumOfCliCalls(); + retcode = SQLCLI_CheckLobLock(GetCliGlobals(), + llid, found); + } + catch(...) + { + retcode = -CLI_INTERNAL_ERROR; +#if defined(_THROW_EXCEPTIONS) + if (cliWillThrow()) + { + threadContext->decrNumOfCliCalls(); + tmpSemaphore->release(); + throw; + } +#endif + } + threadContext->decrNumOfCliCalls(); + tmpSemaphore->release(); + + + RecordError(NULL, retcode); + return retcode; +} Lng32 SQL_EXEC_GetStatistics2( /* IN */ short statsReqType, /* IN */ char *statsReqStr, http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/cli/Context.cpp ---------------------------------------------------------------------- diff --git a/core/sql/cli/Context.cpp b/core/sql/cli/Context.cpp index 7aee780..ac09a9c 100644 --- a/core/sql/cli/Context.cpp +++ b/core/sql/cli/Context.cpp @@ -3163,6 +3163,59 @@ Lng32 ContextCli::setSecInvalidKeys( } +Int32 ContextCli::checkLobLock(char *inLobLockId, NABoolean *found) +{ + Int32 retcode = 0; + *found = FALSE; + CliGlobals *cliGlobals = getCliGlobals(); + StatsGlobals *statsGlobals = GetCliGlobals()->getStatsGlobals(); + if (cliGlobals->getStatsGlobals() == NULL) + { + (diagsArea_) << DgSqlCode(-EXE_RTS_NOT_STARTED); + return diagsArea_.mainSQLCODE(); + } + statsGlobals->checkLobLock(cliGlobals,inLobLockId); + if (inLobLockId != NULL) + *found = TRUE; + return retcode; +} +Lng32 ContextCli::setLobLock( + /* IN */ char *lobLockId // objID+column number + ) +{ + CliGlobals *cliGlobals = getCliGlobals(); + if (cliGlobals->getStatsGlobals() == NULL) + { + (diagsArea_) << DgSqlCode(-EXE_RTS_NOT_STARTED); + return diagsArea_.mainSQLCODE(); + } + ComDiagsArea *tempDiagsArea = &diagsArea_; + tempDiagsArea->clear(); + + IpcServer *ssmpServer = ssmpManager_->getSsmpServer(exHeap(), + cliGlobals->myNodeName(), + cliGlobals->myCpu(), tempDiagsArea); + if (ssmpServer == NULL) + return diagsArea_.mainSQLCODE(); + + SsmpClientMsgStream *ssmpMsgStream = new (cliGlobals->getIpcHeap()) + SsmpClientMsgStream((NAHeap *)cliGlobals->getIpcHeap(), + ssmpManager_, tempDiagsArea); + ssmpMsgStream->addRecipient(ssmpServer->getControlConnection()); + LobLockRequest *llMsg = + new (cliGlobals->getIpcHeap()) LobLockRequest( + cliGlobals->getIpcHeap(), + lobLockId); + *ssmpMsgStream << *llMsg; + // Call send with no timeout. + ssmpMsgStream->send(); + // I/O is now complete. + llMsg->decrRefCount(); + cliGlobals->getEnvironment()->deleteCompletedMessages(); + ssmpManager_->cleanupDeletedSsmpServers(); + return diagsArea_.mainSQLCODE(); + +} ExStatisticsArea *ContextCli::getMergedStats( /* IN */ short statsReqType, /* IN */ char *statsReqStr, http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/cli/Context.h ---------------------------------------------------------------------- diff --git a/core/sql/cli/Context.h b/core/sql/cli/Context.h index d64ac09..c000714 100644 --- a/core/sql/cli/Context.h +++ b/core/sql/cli/Context.h @@ -1006,7 +1006,11 @@ public: Lng32 setSecInvalidKeys( /* IN */ Int32 numSiKeys, /* IN */ SQL_QIKEY siKeys[]); - + Int32 checkLobLock(char* inLobLockId, NABoolean *found); + + Lng32 setLobLock( + /* IN */ char *lobLockId// objID+column number + ); Lng32 holdAndSetCQD(const char * defaultName, const char * defaultValue); Lng32 restoreCQD(const char * defaultName); http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/cli/SessionDefaults.cpp ---------------------------------------------------------------------- diff --git a/core/sql/cli/SessionDefaults.cpp b/core/sql/cli/SessionDefaults.cpp index 966ce77..1e656bd 100644 --- a/core/sql/cli/SessionDefaults.cpp +++ b/core/sql/cli/SessionDefaults.cpp @@ -792,6 +792,7 @@ static const AQRInfo::AQRErrorMap aqrErrorMap[] = // locked row timeout AQREntry( 8550, 73, 2, 0, 0, 0, "", 0, 0), AQREntry( 8550, 78, 1, 60, 0, 0, "", 0, 0), + AQREntry( 8558, 0 , 2, 30, 0, 0, "", 0, 0), AQREntry( 8551, 12, 1, 60, 0, 0, "", 0, 0), http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/comexe/ComTdbExeUtil.h ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComTdbExeUtil.h b/core/sql/comexe/ComTdbExeUtil.h index 4716460..5ae7084 100644 --- a/core/sql/comexe/ComTdbExeUtil.h +++ b/core/sql/comexe/ComTdbExeUtil.h @@ -2921,6 +2921,7 @@ private: APPEND_OR_CREATE = 0x0080, RETRIEVE_HDFSFILENAME= 0x0100, RETRIEVE_OFFSET=0x0200 + }; @@ -3023,6 +3024,11 @@ public: void setReplace(NABoolean v) {(v ? flags_ |= REPLACE_ : flags_ &= ~REPLACE_); }; NABoolean isReplace() { return (flags_ & REPLACE_) != 0; }; + + void setLobLocking(NABoolean v) + {(v ? flags_ |= LOB_LOCKING_ : flags_ &= ~LOB_LOCKING_); }; + NABoolean lobLocking() { return (flags_ & LOB_LOCKING_) != 0; }; + void setUpdateSize(Int64 upd_size){ updateSize_ = upd_size;}; Int64 updateSize() { return updateSize_;} void setTotalBufSize(Int64 bufSize) { totalBufSize_ = bufSize;}; @@ -3041,7 +3047,8 @@ private: ERROR_IF_EXISTS_ = 0x0001, TRUNCATE_ = 0x0002, APPEND_ = 0x0004, - REPLACE_=0x0008 + REPLACE_=0x0008, + LOB_LOCKING_=0x0010 }; NABasicPtr handle_; Int32 handleLen_; http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/executor/ExExeUtil.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExExeUtil.h b/core/sql/executor/ExExeUtil.h index a3f197d..aec3ccb 100644 --- a/core/sql/executor/ExExeUtil.h +++ b/core/sql/executor/ExExeUtil.h @@ -3083,6 +3083,7 @@ public: ExLobStats lobStats_; char statusString_[200]; fstream indata_; + char lobLockId_[12]; ExLobGlobals *exLobGlobals_; }; // ----------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/executor/ExExeUtilLoad.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExExeUtilLoad.cpp b/core/sql/executor/ExExeUtilLoad.cpp index bcd52f2..e562430 100644 --- a/core/sql/executor/ExExeUtilLoad.cpp +++ b/core/sql/executor/ExExeUtilLoad.cpp @@ -3325,7 +3325,7 @@ ExExeUtilLobUpdateTcb::ExExeUtilLobUpdateTcb lobHandleLen_ = 2050; lobHandle_[0] = '\0'; exLobGlobals_=NULL; - + memset(lobLockId_,'\0',LOB_LOCK_ID_SIZE); ExpLOBinterfaceInit(exLobGlobals_,currContext->exHeap(),currContext,TRUE, lobTdb().getLobHdfsServer(), lobTdb().getLobHdfsPort()); @@ -3349,7 +3349,6 @@ short ExExeUtilLobUpdateTcb::work() { Lng32 cliRC = 0; Lng32 retcode = 0; - // if no parent request, return if (qparent_.down->isEmpty()) return WORK_OK; @@ -3459,6 +3458,25 @@ short ExExeUtilLobUpdateTcb::work() char outLobHandle[LOB_HANDLE_LEN]; Int32 outHandleLen; Int64 requestTag = 0; + if (lobTdb().lobLocking()) + { + ExpLOBoper::genLobLockId(uid,lobNum,lobLockId_); + NABoolean found = FALSE; + retcode = SQL_EXEC_CheckLobLock(lobLockId_ , &found); + if (! retcode && !found) + { + retcode = SQL_EXEC_SetLobLock(lobLockId_); + } + else if (found) + { + memset(lobLockId_,'\0',LOB_LOCK_ID_SIZE); + ExRaiseSqlError(getHeap(), &diagsArea_, + (ExeErrorCode)(EXE_LOB_CONCURRENT_ACCESS_ERROR)); + + step_=HANDLE_ERROR_; + break; + } + } retcode = ExpLOBInterfaceUpdate(lobGlobs, lobTdb().getLobHdfsServer(), lobTdb().getLobHdfsPort(), @@ -3521,6 +3539,24 @@ short ExExeUtilLobUpdateTcb::work() lobDataLen_ = lobTdb().totalBufSize_; strcpy(lobLoc_, lobTdb().getLobLocation()); + if (lobTdb().lobLocking()) + { + ExpLOBoper::genLobLockId(uid,lobNum,lobLockId_);; + NABoolean found = FALSE; + retcode = SQL_EXEC_CheckLobLock(lobLockId_, &found); + if (! retcode && !found) + { + retcode = SQL_EXEC_SetLobLock(lobLockId_); + } + else if (found) + { + memset(lobLockId_,'\0',LOB_LOCK_ID_SIZE); + ExRaiseSqlError(getHeap(), &diagsArea_, + (ExeErrorCode)(EXE_LOB_CONCURRENT_ACCESS_ERROR)); + step_=HANDLE_ERROR_; + break; + } + } char outLobHandle[LOB_HANDLE_LEN]; Int32 outHandleLen; Int64 requestTag = 0; @@ -3586,8 +3622,25 @@ short ExExeUtilLobUpdateTcb::work() lobDataLen_ = lobTdb().totalBufSize_; strcpy(lobLoc_, lobTdb().getLobLocation()); - - + + if (lobTdb().lobLocking()) + { + ExpLOBoper::genLobLockId(uid,lobNum,lobLockId_);; + NABoolean found = FALSE; + retcode = SQL_EXEC_CheckLobLock(lobLockId_, &found); + if (! retcode && !found) + { + retcode = SQL_EXEC_SetLobLock(lobLockId_); + } + else if (found) + { + memset(lobLockId_,'\0',LOB_LOCK_ID_SIZE); + ExRaiseSqlError(getHeap(), &diagsArea_, + (ExeErrorCode)(EXE_LOB_CONCURRENT_ACCESS_ERROR)); + step_=HANDLE_ERROR_; + break; + } + } char outLobHandle[LOB_HANDLE_LEN]; Int32 outHandleLen; Int64 requestTag = 0; @@ -3653,8 +3706,13 @@ short ExExeUtilLobUpdateTcb::work() case HANDLE_ERROR_: { retcode = handleError(); + if (retcode == 1) - return WORK_OK; + { + if (lobLockId_[0] && lobTdb().lobLocking()) + retcode = SQL_EXEC_ReleaseLobLock(lobLockId_); + return WORK_OK; + } step_ = DONE_; } @@ -3662,6 +3720,8 @@ short ExExeUtilLobUpdateTcb::work() case DONE_: { retcode = handleDone(); + if(lobLockId_[0] && lobTdb().lobLocking()) + retcode = SQL_EXEC_ReleaseLobLock(lobLockId_); if (retcode == 1) return WORK_OK; http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/exp/ExpErrorEnums.h ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpErrorEnums.h b/core/sql/exp/ExpErrorEnums.h index 8227cb8..2068c75 100644 --- a/core/sql/exp/ExpErrorEnums.h +++ b/core/sql/exp/ExpErrorEnums.h @@ -175,6 +175,7 @@ enum ExeErrorCode EXE_ERROR_STREAM_OVERFLOW = 8553, EXE_EID_INTERNAL_ERROR = 8555, EXE_HBASE_ACCESS_ERROR = 8556, + EXE_LOB_CONCURRENT_ACCESS_ERROR = 8558, EXE_LAST_ERROR_FROM_FS_DP2 = 8569, // --------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/exp/ExpLOB.cpp ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOB.cpp b/core/sql/exp/ExpLOB.cpp index 5d3bf05..2ef8ae4 100644 --- a/core/sql/exp/ExpLOB.cpp +++ b/core/sql/exp/ExpLOB.cpp @@ -278,7 +278,6 @@ Lng32 ExpLOBoper::dropLOB(ExLobGlobals * exLobGlob, ContextCli *currContext, Lng32 rc = 0; - // Call ExeLOBinterface to create the LOB // Call ExeLOBinterface to drop the LOB rc = ExpLOBinterfaceDrop(exLobGlob,hdfsServer, hdfsPort, lobName, lobLoc); @@ -442,6 +441,19 @@ Lng32 ExpLOBoper::extractFromLOBhandle(Int16 *flags, return 0; } +// 12 byte lock identifier uniquely identifies the LOB file that is being +// locked. +// <object UID + lob number > Each LOB column has a unique lob number and +// each column has a unique data file. +void ExpLOBoper::genLobLockId(Int64 objid, Int32 lobNum, char *llid) +{ + memset(llid,'\0',LOB_LOCK_ID_SIZE); + if (objid != -1 && lobNum != -1) + { + memcpy(llid,&objid,sizeof(Int64)) ; + memcpy(&(llid[sizeof(Int64)]),&lobNum,sizeof(Int32)); + } +} // creates LOB handle in string format. void ExpLOBoper::createLOBhandleString(Int16 flags, @@ -664,6 +676,7 @@ void ExpLOBinsert::displayContents(Space * space, const char * displayStr, space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short)); } + ex_expr::exp_return_type ExpLOBiud::insertDesc(char *op_data[], CollHeap*h, ComDiagsArea** diagsArea) @@ -1010,18 +1023,45 @@ ex_expr::exp_return_type ExpLOBinsert::eval(char *op_data[], { ex_expr::exp_return_type err; - + Int32 retcode = 0; + char llid[LOB_LOCK_ID_SIZE]; + if (lobLocking()) + { + ExpLOBoper::genLobLockId(objectUID_,lobNum(),llid); + NABoolean found = FALSE; + retcode = SQL_EXEC_CheckLobLock(llid, &found); + if (! retcode && !found) + { + retcode = SQL_EXEC_SetLobLock(llid); + } + else + { + ExRaiseSqlError(h, diagsArea, + (ExeErrorCode)(EXE_LOB_CONCURRENT_ACCESS_ERROR)); + + return ex_expr::EXPR_ERROR; + } + } err = insertDesc(op_data, h, diagsArea); if (err == ex_expr::EXPR_ERROR) - return err; + { + if (lobLocking()) + retcode = SQL_EXEC_ReleaseLobLock(llid); + return err; + } if(fromEmpty()) - return err; + { + if (lobLocking()) + retcode = SQL_EXEC_ReleaseLobLock(llid); + return err; + } char * handle = op_data[0]; Lng32 handleLen = getOperand(0)->getLength(); err = insertData(handleLen, handle, op_data, h, diagsArea); - + if (lobLocking()) + retcode = SQL_EXEC_ReleaseLobLock(llid); return err; } @@ -1183,7 +1223,7 @@ ex_expr::exp_return_type ExpLOBupdate::eval(char *op_data[], CollHeap*h, ComDiagsArea** diagsArea) { - Lng32 rc; + Lng32 rc, retcode = 0; Lng32 lobOperStatus = checkLobOperStatus(); if (lobOperStatus == DO_NOTHING_) @@ -1230,6 +1270,24 @@ ex_expr::exp_return_type ExpLOBupdate::eval(char *op_data[], lobHandle); //op_data[2]); if (sDescSyskey == -1) //updating empty lob { + + char llid[LOB_LOCK_ID_SIZE]; + if (lobLocking()) + { + ExpLOBoper::genLobLockId(objectUID_,lobNum(),llid);; + NABoolean found = FALSE; + retcode = SQL_EXEC_CheckLobLock(llid, &found); + if (! retcode && !found) + { + retcode = SQL_EXEC_SetLobLock(llid); + } + else if (found) + { + ExRaiseSqlError(h, diagsArea, + (ExeErrorCode)(EXE_LOB_CONCURRENT_ACCESS_ERROR)); + return ex_expr::EXPR_ERROR; + } + } ex_expr::exp_return_type err = insertDesc(op_data, h, diagsArea); if (err == ex_expr::EXPR_ERROR) return err; @@ -1237,7 +1295,8 @@ ex_expr::exp_return_type ExpLOBupdate::eval(char *op_data[], char * handle = op_data[0]; handleLen = getOperand(0)->getLength(); err = insertData(handleLen, handle, op_data, h, diagsArea); - + if (lobLocking()) + retcode = SQL_EXEC_ReleaseLobLock(llid); return err; } @@ -1320,6 +1379,27 @@ ex_expr::exp_return_type ExpLOBupdate::eval(char *op_data[], lobLen = 0; so = Lob_Memory; } + + char llid[LOB_LOCK_ID_SIZE]; + if (lobLocking()) + { + ExpLOBoper::genLobLockId(objectUID_,lobNum(),llid);; + NABoolean found = FALSE; + retcode = SQL_EXEC_CheckLobLock(llid, &found); + if (! retcode && !found) + { + retcode = SQL_EXEC_SetLobLock(llid); + } + else if (found) + { + Int32 lobError = LOB_DATA_FILE_LOCK_ERROR; + ExRaiseSqlError(h, diagsArea, + (ExeErrorCode)(8558), NULL,(Int32 *)&lobError, + NULL, NULL, (char*)"ExpLOBInterfaceInsert", + getLobErrStr(LOB_DATA_FILE_LOCK_ERROR),NULL); + return ex_expr::EXPR_ERROR; + } + } if (isAppend() && !fromEmpty()) { rc = ExpLOBInterfaceUpdateAppend @@ -1366,8 +1446,9 @@ ex_expr::exp_return_type ExpLOBupdate::eval(char *op_data[], fromDescKey, fromDescTS, lobMaxSize, getLobMaxChunkMemSize(),getLobGCLimit()); } - - if (rc < 0) + if (lobLocking()) + retcode = SQL_EXEC_ReleaseLobLock(llid); + if (rc < 0) { Lng32 intParam1 = -rc; ExRaiseSqlError(h, diagsArea, @@ -1375,15 +1456,15 @@ ex_expr::exp_return_type ExpLOBupdate::eval(char *op_data[], &cliError, NULL, (char*)"ExpLOBInterfaceUpdate", (char*)"ExpLOBInterfaceUpdate",getLobErrStr(intParam1)); return ex_expr::EXPR_ERROR; - } + } - // update lob handle with the returned values - str_cpy_all(result, lobHandle, handleLen); - // str_cpy_all(result, op_data[2], handleLen); - // ExpLOBoper::updLOBhandle(sDescSyskey, 0, result); - getOperand(0)->setVarLength(handleLen, op_data[-MAX_OPERANDS]); + // update lob handle with the returned values + str_cpy_all(result, lobHandle, handleLen); + // str_cpy_all(result, op_data[2], handleLen); + // ExpLOBoper::updLOBhandle(sDescSyskey, 0, result); + getOperand(0)->setVarLength(handleLen, op_data[-MAX_OPERANDS]); - return ex_expr::EXPR_OK; + return ex_expr::EXPR_OK; } http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/exp/ExpLOB.h ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOB.h b/core/sql/exp/ExpLOB.h index 9c39876..231fa20 100644 --- a/core/sql/exp/ExpLOB.h +++ b/core/sql/exp/ExpLOB.h @@ -137,6 +137,7 @@ public: Int64 uid, Lng32 lobNum); static Lng32 initLOBglobal(ExLobGlobals *& lobGlob, NAHeap *heap, ContextCli *currContext,char *server, Int32 port ); + static void genLobLockId(Int64 objUid,Int32 lobNum, char *llid); // Extracts values from the LOB handle stored at ptr static Lng32 extractFromLOBhandle(Int16 *flags, @@ -392,7 +393,16 @@ class ExpLOBiud : public ExpLOBoper { { (v) ? liudFlags_ |= FROM_EXTERNAL: liudFlags_ &= ~FROM_EXTERNAL; }; - + + NABoolean lobLocking() + { + return ((liudFlags_ & LOB_LOCKING) != 0); + }; + + inline void setLobLocking(NABoolean v) + { + (v) ? liudFlags_ |= LOB_LOCKING: liudFlags_ &= ~LOB_LOCKING; + }; protected: Int64 objectUID_; @@ -407,7 +417,8 @@ class ExpLOBiud : public ExpLOBoper { FROM_EXTERNAL = 0x0020, FROM_BUFFER = 0x0040, FROM_EMPTY = 0x0080, - FROM_LOB_EXTERNAL = 0x0100 + FROM_LOB_EXTERNAL = 0x0100, + LOB_LOCKING = 0x0200 }; Lng32 liudFlags_; http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/exp/ExpLOBenums.h ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOBenums.h b/core/sql/exp/ExpLOBenums.h index 9c14daf..9729cbc 100644 --- a/core/sql/exp/ExpLOBenums.h +++ b/core/sql/exp/ExpLOBenums.h @@ -32,7 +32,7 @@ #define MAX_BLACK_BOX_LEN 2048 #define LOB_DESC_HEADER_KEY 1 #define NUM_WORKER_THREADS 2 - +#define LOB_LOCK_ID_SIZE 12 // 2 threads at most, one to read and the other to pick up next read from preOpen http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/exp/ExpLOBexternal.h ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOBexternal.h b/core/sql/exp/ExpLOBexternal.h index 657ef12..201036a 100644 --- a/core/sql/exp/ExpLOBexternal.h +++ b/core/sql/exp/ExpLOBexternal.h @@ -171,7 +171,9 @@ Lng32 SQL_EXEC_LOBddlInterface /*IN*/ Int64 lobMaxSize, /*IN*/ NABoolean lobTrace ); - +Lng32 SQL_EXEC_SetLobLock(/* IN */ char *llid); +Lng32 SQL_EXEC_ReleaseLobLock(/* IN */ char *llid); +Lng32 SQL_EXEC_CheckLobLock(/* IN */ char *llid, /* IN */ Int32 *found); /*************************************************************************** Called by loader to load or extract buffers of data. http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/generator/GenItemFunc.cpp ---------------------------------------------------------------------- diff --git a/core/sql/generator/GenItemFunc.cpp b/core/sql/generator/GenItemFunc.cpp index 6121110..60a8f19 100644 --- a/core/sql/generator/GenItemFunc.cpp +++ b/core/sql/generator/GenItemFunc.cpp @@ -2673,6 +2673,11 @@ short LOBinsert::codeGen(Generator * generator) else if(obj_ == LOBoper::EMPTY_LOB_) li->setFromEmpty(TRUE); + if (CmpCommon::getDefault(LOB_LOCKING) == DF_ON) + li->setLobLocking(TRUE); + else + li->setLobLocking(FALSE); + li->lobNum() = lobNum(); li->setLobStorageType(lobStorageType()); li->setLobStorageLocation((char*)lobStorageLocation().data()); @@ -2748,7 +2753,10 @@ short LOBupdate::codeGen(Generator * generator) lu->setFromBuffer(TRUE); else if(obj_ == LOBoper::EMPTY_LOB_) lu->setFromEmpty(TRUE); - + if (CmpCommon::getDefault(LOB_LOCKING) == DF_ON) + lu->setLobLocking(TRUE); + else + lu->setLobLocking(FALSE); lu->lobNum() = lobNum(); lu->setLobStorageType(lobStorageType()); lu->setLobStorageLocation((char*)lobStorageLocation().data()); http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/generator/GenPreCode.cpp ---------------------------------------------------------------------- diff --git a/core/sql/generator/GenPreCode.cpp b/core/sql/generator/GenPreCode.cpp index 9832c44..4d65701 100644 --- a/core/sql/generator/GenPreCode.cpp +++ b/core/sql/generator/GenPreCode.cpp @@ -4520,7 +4520,9 @@ RelExpr * GenericUpdate::preCodeGen(Generator * generator, { oltOptInfo().setOltOpt(FALSE); generator->oltOptInfo()->setOltOpt(FALSE); - generator->setAqrEnabled(FALSE); + //enabling AQR to take care of the lock conflict error 8558 that + // should be retried. + // generator->setAqrEnabled(FALSE); generator->setUpdAbortOnError(TRUE); generator->setUpdSavepointOnError(FALSE); } http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/generator/GenRelExeUtil.cpp ---------------------------------------------------------------------- diff --git a/core/sql/generator/GenRelExeUtil.cpp b/core/sql/generator/GenRelExeUtil.cpp index 570c3e5..013b48a 100644 --- a/core/sql/generator/GenRelExeUtil.cpp +++ b/core/sql/generator/GenRelExeUtil.cpp @@ -4336,6 +4336,8 @@ short ExeUtilLobUpdate::codeGen(Generator * generator) const char* f = ActiveSchemaDB()->getDefaults(). getValue(LOB_STORAGE_FILE_DIR); + + char *lobLoc = space->allocateAlignedSpace(strlen(f) + 1); strcpy(lobLoc, f); ComTdbExeUtilLobUpdate * exe_util_lobupdate_tdb = new(space) @@ -4383,6 +4385,10 @@ short ExeUtilLobUpdate::codeGen(Generator * generator) exe_util_lobupdate_tdb->setAppend(TRUE); else exe_util_lobupdate_tdb->setAppend(FALSE); + if((ActiveSchemaDB()->getDefaults()).getToken(LOB_LOCKING) == DF_ON) + exe_util_lobupdate_tdb->setLobLocking(TRUE); + else + exe_util_lobupdate_tdb->setLobLocking(FALSE); generator->initTdbFields(exe_util_lobupdate_tdb); http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/parser/sqlparser.y ---------------------------------------------------------------------- diff --git a/core/sql/parser/sqlparser.y b/core/sql/parser/sqlparser.y index b84546d..0f52e33 100755 --- a/core/sql/parser/sqlparser.y +++ b/core/sql/parser/sqlparser.y @@ -11728,7 +11728,7 @@ blob_optional_left_len_right: '(' NUMERIC_LITERAL_EXACT_NO_SCALE optional_lob_un if (CmpCommon::getDefault(TRAF_BLOB_AS_VARCHAR) == DF_ON) { - $$ = (Int64)CmpCommon::getDefault(TRAF_MAX_CHARACTER_COL_LENGTH ); + $$ = (Int64)CmpCommon::getDefaultNumeric(TRAF_MAX_CHARACTER_COL_LENGTH ); } else { @@ -11764,7 +11764,7 @@ clob_optional_left_len_right: '(' NUMERIC_LITERAL_EXACT_NO_SCALE optional_lob_un if (CmpCommon::getDefault(TRAF_CLOB_AS_VARCHAR) == DF_ON) { - $$ = (Int64)CmpCommon::getDefault(TRAF_MAX_CHARACTER_COL_LENGTH ); + $$ = (Int64)CmpCommon::getDefaultNumeric(TRAF_MAX_CHARACTER_COL_LENGTH ); } else { http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/runtimestats/SqlStats.cpp ---------------------------------------------------------------------- diff --git a/core/sql/runtimestats/SqlStats.cpp b/core/sql/runtimestats/SqlStats.cpp index 0b64e02..aec2999 100644 --- a/core/sql/runtimestats/SqlStats.cpp +++ b/core/sql/runtimestats/SqlStats.cpp @@ -73,6 +73,7 @@ StatsGlobals::StatsGlobals(void *baseAddr, short envType, Lng32 maxSegSize) , maxPid_(0) , pidToCheck_(0) , ssmpDumpedTimestamp_(0) + , lobLocks_(NULL) { statsHeap_.setSharedMemory(); //Phandle wrapper in porting layer @@ -112,6 +113,7 @@ void StatsGlobals::init() stmtStatsList_ = new (&statsHeap_) SyncHashQueue(&statsHeap_, 512); rmsStats_ = new (&statsHeap_) ExRMSStats(&statsHeap_); recentSikeys_ = new (&statsHeap_) SyncHashQueue(&statsHeap_, 512); + lobLocks_ = new (&statsHeap_) SyncHashQueue(&statsHeap_, 512); rmsStats_->setCpu(cpu_); rmsStats_->setRmsVersion(version_); rmsStats_->setRmsEnvType(rtsEnvType_); @@ -1082,7 +1084,25 @@ Lng32 StatsGlobals::updateStats(ComDiagsArea &diags, SQLQUERY_ID *query_id, void diags << DgSqlCode(-CLI_INTERNAL_ERROR); return retcode; } - +Int32 StatsGlobals::checkLobLock(CliGlobals *cliGlobals, char *&lobLockId) +{ + int error = getStatsSemaphore(cliGlobals->getSemId(), cliGlobals->myPin()); + if ((lobLocks_ ==NULL) || lobLocks_->isEmpty()) + { + lobLockId = NULL; + releaseStatsSemaphore(cliGlobals->getSemId(), cliGlobals->myPin()); + return 0; + } + lobLocks_->position(lobLockId,LOB_LOCK_ID_SIZE); + //Look in the current chain for a match + while (lobLocks_->getCurr() != NULL && memcmp(lobLockId, (char *)(lobLocks_->getCurr()),LOB_LOCK_ID_SIZE) !=0 ) + lobLocks_->getNext(); + if (lobLocks_->getCurr() == NULL) + lobLockId = NULL; + + releaseStatsSemaphore(cliGlobals->getSemId(), cliGlobals->myPin()); + return 0; +} Lng32 StatsGlobals::getSecInvalidKeys( CliGlobals * cliGlobals, Int64 lastCallTimestamp, http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/runtimestats/SqlStats.h ---------------------------------------------------------------------- diff --git a/core/sql/runtimestats/SqlStats.h b/core/sql/runtimestats/SqlStats.h index 0377c3d..5e23b0c 100644 --- a/core/sql/runtimestats/SqlStats.h +++ b/core/sql/runtimestats/SqlStats.h @@ -473,6 +473,7 @@ public: SQL_QIKEY [], Int32 maxNumSiKeys, Int32 *returnedNumSiKeys); + Int32 checkLobLock(CliGlobals *cliGlobals,char *&lobLockId); void mergeNewSikeys(Int32 numSikeys, SQL_QIKEY sikeys[]); @@ -489,6 +490,7 @@ public: SB_Phandle_Type *getSsmpProcHandle() { return &ssmpProcHandle_; } SB_Phandle_Type *getSscpProcHandle() { return &sscpProcHandle_; } SyncHashQueue *getRecentSikeys() { return recentSikeys_; } + SyncHashQueue *getLobLocks() { return lobLocks_;} void setSsmpProcSemId(Long semId) { ssmpProcSemId_ = semId; } Long &getSsmpProcSemId() { return ssmpProcSemId_; } void setSscpProcSemId(Long semId) { sscpProcSemId_ = semId; } @@ -531,6 +533,7 @@ private: pid_t maxPid_; Int64 ssmpDumpedTimestamp_; MemoryMonitor *memMonitor_; + SyncHashQueue *lobLocks_; }; StatsGlobals * shareStatsSegment(Int32 &shmid, NABoolean checkForSSMP = TRUE); short getMasterCpu(char *uniqueStmtId, Lng32 uniqueStmtIdLen, char *nodeName, short maxLen, short &cpu); http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/runtimestats/rts_msg.cpp ---------------------------------------------------------------------- diff --git a/core/sql/runtimestats/rts_msg.cpp b/core/sql/runtimestats/rts_msg.cpp index 69146cf..65b8497 100644 --- a/core/sql/runtimestats/rts_msg.cpp +++ b/core/sql/runtimestats/rts_msg.cpp @@ -883,3 +883,42 @@ void SecInvalidKeyRequest::unpackObj(IpcMessageObjType objType, sikPtr_ = NULL; } +LobLockRequest::LobLockRequest(NAMemory *heap, + char *lobLockId + ) : + RtsMessageObj(LOB_LOCK_REQ, + CurrLobLockVersionNumber, heap) +{ + memcpy(lobLockId_,lobLockId,LOB_LOCK_ID_SIZE+1); +} + +LobLockRequest::~LobLockRequest() +{ + memset(lobLockId_,0,LOB_LOCK_ID_SIZE+1); +} + +IpcMessageObjSize LobLockRequest::packedLength() +{ + IpcMessageObjSize result = baseClassPackedLength(); + result += sizeof(lobLockId_); + return result; +} + +IpcMessageObjSize LobLockRequest::packObjIntoMessage( + IpcMessageBufferPtr buffer) +{ + IpcMessageObjSize result = packBaseClassIntoMessage(buffer); + result += packStrIntoBuffer(buffer, lobLockId_,LOB_LOCK_ID_SIZE+1); + return result; +} + +void LobLockRequest::unpackObj(IpcMessageObjType objType, + IpcMessageObjVersion objVersion, + NABoolean sameEndianness, + IpcMessageObjSize objSize, + IpcConstMessageBufferPtr buffer) +{ + unpackBaseClass(buffer); + unpackStrFromBuffer(buffer,lobLockId_,LOB_LOCK_ID_SIZE+1); +} + http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/runtimestats/rts_msg.h ---------------------------------------------------------------------- diff --git a/core/sql/runtimestats/rts_msg.h b/core/sql/runtimestats/rts_msg.h index 7f833c0..af11074 100644 --- a/core/sql/runtimestats/rts_msg.h +++ b/core/sql/runtimestats/rts_msg.h @@ -32,7 +32,7 @@ #include "ComSmallDefs.h" #include "ComCextdecs.h" #include "Int64.h" - +#include "ExpLOBenums.h" #include <stdio.h> #include "sqlcli.h" @@ -73,7 +73,7 @@ const Int32 CurrSuspendQueryReplyVersionNumber = 100; const Int32 CurrActivateQueryReqVersionNumber = 100; const Int32 CurrActivateQueryReplyVersionNumber = 100; const Int32 CurrSecurityInvalidKeyVersionNumber = 100; - +const Int32 CurrLobLockVersionNumber=100; // // An enumeration of all IPC objects for RTS Servers. // Includes both message objects and stream objects. @@ -107,12 +107,13 @@ enum RtsMessageObjType CANCEL_QUERY_KILL_SERVERS_REQ, // 9019 CANCEL_QUERY_KILL_SERVERS_REPLY, // 9020 SECURITY_INVALID_KEY_REQ, // 9021 - + LOB_LOCK_REQ, // 9022 // Object Types RTS_QUERY_ID = IPC_MSG_RTS_FIRST + 500, // 9500 RTS_EXPLAIN_FRAG, // 9501 - RTS_DIAGNOSTICS_AREA = IPC_SQL_DIAG_AREA + RTS_DIAGNOSTICS_AREA = IPC_SQL_DIAG_AREA, + }; typedef Int64 RtsHandle; @@ -1194,5 +1195,36 @@ private: }; + +// This message is sent from the CLI's ContextCli::setLobLock +// to MXSSMP. It is also sent from MXSSMP to MXSSCP. +class LobLockRequest: public RtsMessageObj +{ +public: + LobLockRequest(NAMemory *heap) + : RtsMessageObj(LOB_LOCK_REQ, + CurrLobLockVersionNumber, heap) + { + memset(lobLockId_,0, sizeof(lobLockId_)); + } + + LobLockRequest(NAMemory *heap, + char *lobId ); + + virtual ~LobLockRequest(); + + IpcMessageObjSize packedLength(); + IpcMessageObjSize packObjIntoMessage(IpcMessageBufferPtr buffer); + void unpackObj(IpcMessageObjType objType, + IpcMessageObjVersion objVersion, + NABoolean sameEndianness, + IpcMessageObjSize objSize, + IpcConstMessageBufferPtr buffer); + + char *getLobLockId() {return lobLockId_; } + +private: + char lobLockId_[LOB_LOCK_ID_SIZE+1];//allow for the lock as well as a '+' or '-' +}; #endif // _RTS_EXE_IPC_H_ http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/runtimestats/sscpipc.cpp ---------------------------------------------------------------------- diff --git a/core/sql/runtimestats/sscpipc.cpp b/core/sql/runtimestats/sscpipc.cpp index b382384..ccb8dbb 100755 --- a/core/sql/runtimestats/sscpipc.cpp +++ b/core/sql/runtimestats/sscpipc.cpp @@ -215,6 +215,9 @@ void SscpNewIncomingConnectionStream::actOnReceive(IpcConnection *connection) case SECURITY_INVALID_KEY_REQ: processSecInvReq(); break; + case LOB_LOCK_REQ: + processLobLockReq(); + break; default: ex_assert(FALSE,"Invalid request for first client message"); } @@ -907,3 +910,53 @@ void SscpNewIncomingConnectionStream::processSecInvReq() request->decrRefCount(); } +void SscpNewIncomingConnectionStream::processLobLockReq() +{ + IpcMessageObjVersion msgVer = getNextObjVersion(); + + ex_assert(msgVer <= currRtsStatsReqVersionNumber, "Up-rev message received."); + NAHeap *statsHeap = NULL; + LobLockRequest *request = new(getHeap()) + LobLockRequest(getHeap()); + + *this >> *request; + ex_assert( !moreObjects(), "unknown object follows LobLockRequest."); + SscpGlobals *sscpGlobals = getSscpGlobals(); + StatsGlobals *statsGlobals = sscpGlobals->getStatsGlobals(); + int error = statsGlobals->getStatsSemaphore(sscpGlobals->getSemId(), + sscpGlobals->myPin()); + statsHeap = statsGlobals->getStatsHeap(); + char *ll = new (statsHeap) char [LOB_LOCK_ID_SIZE]; + memcpy(ll,request->getLobLockId(),LOB_LOCK_ID_SIZE+1); + SyncHashQueue *lobLockList = statsGlobals->getLobLocks(); + if (ll[0] == '+') // If it's a positive value, we are supposed to insert it. + lobLockList->insert(&ll[1],LOB_LOCK_ID_SIZE,&ll[1]); + else if (ll[0] =='-') + { + //negative value means we need to remove/release it from the list + lobLockList->position((char *)&ll[1], LOB_LOCK_ID_SIZE); + while (lobLockList->getCurr() && + memcmp(lobLockList->getCurr(), &ll[1],LOB_LOCK_ID_SIZE)!= 0) + lobLockList->getNext(); + + lobLockList->remove((char *)&ll[1], LOB_LOCK_ID_SIZE,lobLockList->getCurr()); + } + else + ex_assert(FALSE,"invalid lob lock id in LobLockRequest"); + + + statsGlobals->releaseStatsSemaphore(sscpGlobals->getSemId(), + sscpGlobals->myPin()); + clearAllObjects(); + setType(IPC_MSG_SSCP_REPLY); + setVersion(CurrSscpReplyMessageVersion); + + RmsGenericReply *reply = new(getHeap()) + RmsGenericReply(getHeap()); + + *this << *reply; + + send(FALSE); + reply->decrRefCount(); + request->decrRefCount(); +} http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/runtimestats/sscpipc.h ---------------------------------------------------------------------- diff --git a/core/sql/runtimestats/sscpipc.h b/core/sql/runtimestats/sscpipc.h index 3fcc04e..dea7623 100644 --- a/core/sql/runtimestats/sscpipc.h +++ b/core/sql/runtimestats/sscpipc.h @@ -110,6 +110,7 @@ public: void processKillServersReq(); void suspendActivateSchedulers(); void processSecInvReq(); + void processLobLockReq(); private: SscpGlobals *sscpGlobals_; http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/runtimestats/ssmpipc.cpp ---------------------------------------------------------------------- diff --git a/core/sql/runtimestats/ssmpipc.cpp b/core/sql/runtimestats/ssmpipc.cpp index 9831d6f..504ef76 100755 --- a/core/sql/runtimestats/ssmpipc.cpp +++ b/core/sql/runtimestats/ssmpipc.cpp @@ -1187,6 +1187,9 @@ void SsmpNewIncomingConnectionStream::actOnReceive(IpcConnection *connection) case SECURITY_INVALID_KEY_REQ: actOnSecInvalidKeyReq(connection); break; + case LOB_LOCK_REQ: + actOnLobLockReq(connection); + break; default: ex_assert(FALSE,"Invalid request from client"); } @@ -1618,7 +1621,32 @@ void SsmpNewIncomingConnectionStream::actOnActivateQueryReq( "expected an RTS_QUERY_ID following a SuspendQueryRequest"); } +void SsmpNewIncomingConnectionStream::actOnLobLockReq( + IpcConnection *connection) +{ + IpcMessageObjVersion msgVer = getNextObjVersion(); + ex_assert(msgVer <= CurrLobLockVersionNumber, + "Up-rev message received."); + LobLockRequest *llReq= new (getHeap()) LobLockRequest(getHeap()); + *this >> *llReq; + setHandle(llReq->getHandle()); + ex_assert(!moreObjects(),"Unexpected objects following LobLockRequest"); + clearAllObjects(); +// Forward request to all mxsscps. + ssmpGlobals_->allocateServers(); + SscpClientMsgStream *sscpMsgStream = new (heap_) + SscpClientMsgStream(heap_, getIpcEnv(), ssmpGlobals_, this); + sscpMsgStream->setUsedToSendLLMsgs(); + ssmpGlobals_->addRecipients(sscpMsgStream); + sscpMsgStream->clearAllObjects(); + *sscpMsgStream << *llReq; + llReq->decrRefCount(); + sscpMsgStream->send(FALSE); + + // Reply to client when the msgs to mxsscp have all completed. The reply + // is made from the sscpMsgStream's callback. +} void SsmpNewIncomingConnectionStream::actOnSecInvalidKeyReq( IpcConnection *connection) { @@ -2322,6 +2350,11 @@ void SscpClientMsgStream::actOnReceiveAllComplete() replySik(); break; } + case LL: + { + replyLL(); + break; + } default: { ex_assert(FALSE, "Unknown completionProcessing_ flag."); @@ -2353,6 +2386,25 @@ void SscpClientMsgStream::replySik() reply->decrRefCount(); } +void SscpClientMsgStream::replyLL() +{ + RmsGenericReply *reply = new(getHeap()) + RmsGenericReply(getHeap()); + + *ssmpStream_ << *reply; + + if (ssmpStream_->getSscpDiagsArea()) + { + // Pass errors from communication w/ SSCPs back to the + // client. + *ssmpStream_ << *(ssmpStream_->getSscpDiagsArea()); + ssmpStream_->clearSscpDiagsArea(); + } + + ssmpStream_->send(FALSE); + reply->decrRefCount(); +} + void SscpClientMsgStream::sendMergedStats() { StmtStats *stmtStats; http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/runtimestats/ssmpipc.h ---------------------------------------------------------------------- diff --git a/core/sql/runtimestats/ssmpipc.h b/core/sql/runtimestats/ssmpipc.h index d3e5a9b..7171e9b 100644 --- a/core/sql/runtimestats/ssmpipc.h +++ b/core/sql/runtimestats/ssmpipc.h @@ -238,6 +238,7 @@ public: void actOnSuspendQueryReq(IpcConnection *connection); void actOnActivateQueryReq(IpcConnection *connection); void actOnSecInvalidKeyReq(IpcConnection *connection); + void actOnLobLockReq(IpcConnection *connection); void getProcessStats(short reqType, short subReqType, pid_t pid); @@ -345,7 +346,9 @@ public: inline short getDetailLevel() { return detailLevel_; } inline void setUsedToSendCbMsgs() { completionProcessing_ = CB; } inline void setUsedToSendSikMsgs() { completionProcessing_ = SIK; } + inline void setUsedToSendLLMsgs() { completionProcessing_ = LL; } void replySik(); + void replyLL(); inline short getSubReqType() { return subReqType_; } inline void setSubReqType(short subReqType) { subReqType_ = subReqType; } private: @@ -361,7 +364,7 @@ private: short numSqlProcs_; short numCpus_; StmtStats *stmtStats_; - enum { STATS, CB, SIK } completionProcessing_; + enum { STATS, CB, SIK,LL} completionProcessing_; short detailLevel_; short subReqType_; }; http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/sqlcomp/DefaultConstants.h ---------------------------------------------------------------------- diff --git a/core/sql/sqlcomp/DefaultConstants.h b/core/sql/sqlcomp/DefaultConstants.h index 0096424..1a9304e 100644 --- a/core/sql/sqlcomp/DefaultConstants.h +++ b/core/sql/sqlcomp/DefaultConstants.h @@ -2670,6 +2670,7 @@ enum DefaultConstants LOB_GC_LIMIT_SIZE, LOB_INPUT_LIMIT_FOR_BATCH, + LOB_LOCKING, // Should the DISK POOL be turned on when replicating the DDL using COPY DDL REPLICATE_DISK_POOL, http://git-wip-us.apache.org/repos/asf/trafodion/blob/b041abd9/core/sql/sqlcomp/nadefaults.cpp ---------------------------------------------------------------------- diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp index 1c47c54..3622db1 100644 --- a/core/sql/sqlcomp/nadefaults.cpp +++ b/core/sql/sqlcomp/nadefaults.cpp @@ -1753,6 +1753,11 @@ SDDkwd__(ISO_MAPPING, (char *)SQLCHARSETSTRING_ISO88591), DD_____(LOB_HDFS_SERVER, "default"), // For JDBC/ODBC batch operations, LOB size limited to 4K bytes DDint__(LOB_INPUT_LIMIT_FOR_BATCH, "16384"), + // Control the locking via RMS shared lock. This ensures the CLI and HDFS + // operations for any LOB UID are done under a lock so concurrent operations + // wont conflict and cause incosistent data. For non concurrent applications, + // we can turn this off as a performance enhancement. + DDkwd__(LOB_LOCKING, "ON"), // Size of memoryin Megabytes used to perform I/O to lob data file // default size is 128MB . Change to adjust memory usage. DDint__(LOB_MAX_CHUNK_MEM_SIZE, "128"),
