[TRAFODION-3110] LOB: Extract lobtofile() to a hdfs file returns 8442 error
Fixes as per the review comments for the commit ba00576e1f47a9f0e0b3f344da8742aeecbc3ce Also took care of truncate option. Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/82ef6c1c Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/82ef6c1c Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/82ef6c1c Branch: refs/heads/master Commit: 82ef6c1c9487404b5a2e95993e30ee7fefd1fcdc Parents: 39e8791 Author: selvaganesang <selva.govindara...@esgyn.com> Authored: Thu Aug 23 17:49:03 2018 +0000 Committer: selvaganesang <selva.govindara...@esgyn.com> Committed: Thu Aug 23 17:49:03 2018 +0000 ---------------------------------------------------------------------- core/sql/exp/ExpLOBaccess.cpp | 122 +++++++++++++++++++++---------------- 1 file changed, 69 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/82ef6c1c/core/sql/exp/ExpLOBaccess.cpp ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOBaccess.cpp b/core/sql/exp/ExpLOBaccess.cpp index e3c086a..ab592f4 100644 --- a/core/sql/exp/ExpLOBaccess.cpp +++ b/core/sql/exp/ExpLOBaccess.cpp @@ -2462,7 +2462,7 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, Int64 offset, Int64 s writeOperLen = 0; HdfsClient *tgtHdfsClient; HDFS_Client_RetCode hdfsClientRetcode; - NABoolean overwrite = TRUE; + NABoolean overwrite = FALSE; NABoolean append = FALSE; Int64 remainLen = size; Int64 pos = offset; @@ -2471,41 +2471,17 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, Int64 offset, Int64 s // open and write to the target file int openFlags = O_WRONLY; if (! useLibHdfs_) { - if (((LobTgtFileFlags)fileflags == Lob_Error_Or_Create) || - ((LobTgtFileFlags)fileflags == Lob_Truncate_Or_Error)) - overwrite = FALSE; - if ((LobTgtFileFlags)fileflags == Lob_Append_Or_Error) + if (((LobTgtFileFlags)fileflags == Lob_Truncate_Or_Error) || + ((LobTgtFileFlags)fileflags == Lob_Truncate_Or_Create)) + overwrite = TRUE; + if (((LobTgtFileFlags)fileflags == Lob_Append_Or_Error) || + ((LobTgtFileFlags)fileflags == Lob_Append_Or_Create)) append = TRUE; tgtHdfsClient = HdfsClient::newInstance(getLobGlobalHeap(), NULL, hdfsClientRetcode); ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: HdfsClient::newInstance returned an error"); if (tgtHdfsClient->hdfsCreate(tgtFileName, overwrite, append, FALSE) != HDFS_CLIENT_OK) return LOB_TARGET_FILE_OPEN_ERROR; - Int32 bytesRead; - Int32 bytesWritten; - while (remainLen > 0) - { - if (remainLen > lobMaxChunkMemLen) - readLen = lobMaxChunkMemLen; - else - readLen = remainLen; - if (lobData == NULL) - lobData = new (lobGlobalHeap_) char[readLen]; - bytesRead = hdfsClient_->hdfsRead(pos, lobData, readLen, hdfsClientRetcode); - if (hdfsClientRetcode == HDFS_CLIENT_OK) - bytesWritten = tgtHdfsClient->hdfsWrite(lobData, bytesRead, hdfsClientRetcode, lobMaxChunkMemLen); - if (hdfsClientRetcode == HDFS_CLIENT_OK) { - pos += bytesRead; - remainLen -= bytesRead; - writeOperLen += bytesWritten; - } else { - NADELETEBASIC(lobData, lobGlobalHeap_); - HdfsClient::deleteInstance(tgtHdfsClient); - return LOB_DATA_READ_ERROR; - } - } - HdfsClient::deleteInstance(tgtHdfsClient); - return LOB_OPER_OK; - } + } else { if ((LobTgtFileFlags)fileflags == Lob_Append_Or_Error ) openFlags |= O_APPEND; //hdfsFile fdTgtFile = hdfsOpenFile(fs_,tgtFileName, openFlags, 0,0,0); @@ -2537,13 +2513,42 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, Int64 offset, Int64 s return LOB_TARGET_FILE_OPEN_ERROR; } } + } + if (!multipleChunks) { + if (! useLibHdfs_) { + Int32 bytesRead; + Int32 bytesWritten; + while (remainLen > 0) + { + if (remainLen > lobMaxChunkMemLen) + readLen = lobMaxChunkMemLen; + else + readLen = remainLen; + if (lobData == NULL) + lobData = new (lobGlobalHeap_) char[readLen]; + bytesRead = hdfsClient_->hdfsRead(pos, lobData, readLen, hdfsClientRetcode); + if (hdfsClientRetcode == HDFS_CLIENT_OK) + bytesWritten = tgtHdfsClient->hdfsWrite(lobData, bytesRead, hdfsClientRetcode, lobMaxChunkMemLen); + if (hdfsClientRetcode == HDFS_CLIENT_OK) { + pos += bytesRead; + remainLen -= bytesRead; + writeOperLen += bytesWritten; + } else { + NADELETEBASIC(lobData, lobGlobalHeap_); + HdfsClient::deleteInstance(tgtHdfsClient); + return LOB_DATA_READ_ERROR; + } + } + HdfsClient::deleteInstance(tgtHdfsClient); + return LOB_OPER_OK; + } // !multipleChunk && !useLibHdfs + else { + if ((srcLen < lobMaxChunkMemLen)) + { + lobDebugInfo("Reading in single chunk",0,__LINE__,lobTrace_); + lobData = (char *) (getLobGlobalHeap())->allocateMemory(srcLen); - if ((srcLen < lobMaxChunkMemLen) && (multipleChunks ==FALSE)) // simple single I/O case - { - lobDebugInfo("Reading in single chunk",0,__LINE__,lobTrace_); - lobData = (char *) (getLobGlobalHeap())->allocateMemory(srcLen); - - if (lobData == NULL) + if (lobData == NULL) { return LOB_SOURCE_DATA_ALLOC_ERROR; } @@ -2567,23 +2572,22 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, Int64 offset, Int64 s } getLobGlobalHeap()->deallocateMemory(lobData); } - else - {// multiple chunks to read + } // !multipleChunk && useLibHdfs + } // !multipleChunk + else {// multiple chunks to read lobDebugInfo("Reading in multiple chunks into local file",0,__LINE__,lobTrace_); err = openCursor(handleIn, handleInLen, transId); if (err != LOB_OPER_OK) return err; + chunkSize = MINOF(srcLen, lobMaxChunkMemLen); + lobData = (char *) (getLobGlobalHeap())->allocateMemory(chunkSize); + if (lobData == NULL) + return LOB_SOURCE_DATA_ALLOC_ERROR; while ( srcLen > 0) { chunkSize = MINOF(srcLen, lobMaxChunkMemLen); - lobData = (char *) (getLobGlobalHeap())->allocateMemory(chunkSize); - if (lobData == NULL) - { - getLobGlobalHeap()->deallocateMemory(lobData); - return LOB_SOURCE_DATA_ALLOC_ERROR; - } //handle reading the multiple chunks like a cursor err = readCursor(lobData,chunkSize, handleIn, handleInLen, operLen, transId); @@ -2594,6 +2598,15 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, Int64 offset, Int64 s getLobGlobalHeap()->deallocateMemory(lobData); return LOB_DATA_READ_ERROR; } + if (!useLibHdfs_) { + writeOperLen += tgtHdfsClient->hdfsWrite(lobData, chunkSize, hdfsClientRetcode, lobMaxChunkMemLen); + if (hdfsClientRetcode != HDFS_CLIENT_OK) { + NADELETEBASIC(lobData, lobGlobalHeap_); + HdfsClient::deleteInstance(tgtHdfsClient); + return LOB_TARGET_FILE_WRITE_ERROR; + } + } + else { writeOperLen += hdfsWrite(fs_,fdTgtFile,lobData, chunkSize); if (writeOperLen <= 0) { @@ -2605,18 +2618,21 @@ Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, Int64 offset, Int64 s getLobGlobalHeap()->deallocateMemory(lobData); return LOB_DATA_FLUSH_ERROR; } - getLobGlobalHeap()->deallocateMemory(lobData); + } srcLen -= chunkSize; - } closeCursor(handleIn, handleInLen,transId); - } - hdfsCloseFile(fs_, fdTgtFile); - fdTgtFile=NULL; - hdfsCloseFile(fs_,fdData_); - fdData_=NULL; - + } + getLobGlobalHeap()->deallocateMemory(lobData); + if (! useLibHdfs_) { + HdfsClient::deleteInstance(tgtHdfsClient); + } else { + hdfsCloseFile(fs_, fdTgtFile); + fdTgtFile=NULL; + hdfsCloseFile(fs_,fdData_); + fdData_=NULL; + } return LOB_OPER_OK; }