[TRAFODION-3065] Trafodion to support compressed Hive Text formatted tables
Compressed text files are now supported via the new implementation using HDFS java APIs. When the hadoop is not configured to support a particular type of compression, an error is thrown. [TRAFODION-2982] JNI HDFS interface should support varied sized large buffers for read/write A new CQD HDFS_IO_INTERIM_BYTEARRAY_SIZE_IN_KB is introduced to chunk the read and write when byteArray is involved. Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/96cab4dd Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/96cab4dd Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/96cab4dd Branch: refs/heads/master Commit: 96cab4ddd086a59ebc0eab8ac4a93ee3cf315aac Parents: f216cdb Author: selvaganesang <[email protected]> Authored: Wed May 9 00:36:04 2018 +0000 Committer: selvaganesang <[email protected]> Committed: Wed May 9 18:51:31 2018 +0000 ---------------------------------------------------------------------- core/sql/comexe/ComTdbFastTransport.cpp | 1 + core/sql/comexe/ComTdbFastTransport.h | 7 +- core/sql/comexe/ComTdbHdfsScan.cpp | 1 + core/sql/comexe/ComTdbHdfsScan.h | 5 + core/sql/executor/ExHdfsScan.cpp | 16 ++- core/sql/executor/HdfsClient_JNI.cpp | 126 +++++++++++------ core/sql/executor/HdfsClient_JNI.h | 9 +- .../sql/executor/org_trafodion_sql_HDFSClient.h | 31 ++++ core/sql/exp/ExpLOBinterface.h | 10 ++ core/sql/generator/GenFastTransport.cpp | 4 + core/sql/generator/GenRelScan.cpp | 5 +- core/sql/regress/hive/DIFF002.KNOWN | 14 ++ core/sql/regress/hive/FILTER002 | 33 +++++ core/sql/sqlcomp/DefaultConstants.h | 5 + core/sql/sqlcomp/nadefaults.cpp | 1 + .../main/java/org/trafodion/sql/HDFSClient.java | 140 +++++++++++++++---- .../main/java/org/trafodion/sql/HdfsScan.java | 34 ++++- 17 files changed, 354 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/comexe/ComTdbFastTransport.cpp ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComTdbFastTransport.cpp b/core/sql/comexe/ComTdbFastTransport.cpp index 49d830e..90f635f 100644 --- a/core/sql/comexe/ComTdbFastTransport.cpp +++ b/core/sql/comexe/ComTdbFastTransport.cpp @@ -99,6 +99,7 @@ ComTdbFastExtract::ComTdbFastExtract( hdfsReplication_(replication), ioTimeout_(ioTimeout), childDataRowLen_(childDataRowLen), + hdfsIoByteArraySize_(0), modTSforDir_(-1) { http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/comexe/ComTdbFastTransport.h ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComTdbFastTransport.h b/core/sql/comexe/ComTdbFastTransport.h index 37c01da..0666953 100644 --- a/core/sql/comexe/ComTdbFastTransport.h +++ b/core/sql/comexe/ComTdbFastTransport.h @@ -369,6 +369,10 @@ public: void setModTSforDir(Int64 v) { modTSforDir_ = v; } Int64 getModTSforDir() const { return modTSforDir_; } + void setHdfsIoByteArraySize(int size) + { hdfsIoByteArraySize_ = size; } + UInt16 getHdfsIoByteArraySize() + { return hdfsIoByteArraySize_; } protected: NABasicPtr targetName_; // 00 - 07 NABasicPtr delimiter_; // 08 - 15 @@ -395,9 +399,10 @@ protected: UInt16 filler_; // 130 - 131 UInt32 childDataRowLen_; // 132 - 135 Int64 modTSforDir_; // 136 - 143 + UInt16 hdfsIoByteArraySize_; // 144 - 147 // Make sure class size is a multiple of 8 - char fillerComTdbFastTransport_[8]; // 144 - 151 + char fillerComTdbFastTransport_[4]; // 148 - 151 }; http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/comexe/ComTdbHdfsScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComTdbHdfsScan.cpp b/core/sql/comexe/ComTdbHdfsScan.cpp index a0bf5c1..f5e2907 100755 --- a/core/sql/comexe/ComTdbHdfsScan.cpp +++ b/core/sql/comexe/ComTdbHdfsScan.cpp @@ -121,6 +121,7 @@ ComTdbHdfsScan::ComTdbHdfsScan( hdfsRootDir_(hdfsRootDir), modTSforDir_(modTSforDir), numOfPartCols_(numOfPartCols), + hdfsIoByteArraySize_(0), hdfsDirsToCheck_(hdfsDirsToCheck) {}; http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/comexe/ComTdbHdfsScan.h ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComTdbHdfsScan.h b/core/sql/comexe/ComTdbHdfsScan.h index ea995fb..f9a0afd 100755 --- a/core/sql/comexe/ComTdbHdfsScan.h +++ b/core/sql/comexe/ComTdbHdfsScan.h @@ -136,6 +136,7 @@ class ComTdbHdfsScan : public ComTdb UInt16 origTuppIndex_; // 188 - 189 char fillersComTdbHdfsScan1_[2]; // 190 - 191 NABasicPtr nullFormat_; // 192 - 199 + UInt16 hdfsIoByteArraySize_; // 198 - 199 // next 4 params are used to check if data under hdfsFileDir // was modified after query was compiled. @@ -362,6 +363,10 @@ public: Queue * hdfsDirsToCheck() { return hdfsDirsToCheck_; } char *hdfsRootDir() { return hdfsRootDir_; } + void setHdfsIoByteArraySize(int size) + { hdfsIoByteArraySize_ = size; } + UInt16 getHdfsIoByteArraySize() + { return hdfsIoByteArraySize_; } }; inline ComTdb * ComTdbHdfsScan::getChildTdb() http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/executor/ExHdfsScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp index 97697f3..e5d73dc 100644 --- a/core/sql/executor/ExHdfsScan.cpp +++ b/core/sql/executor/ExHdfsScan.cpp @@ -129,7 +129,7 @@ ExHdfsScanTcb::ExHdfsScanTcb( Space * space = (glob ? glob->getSpace() : 0); CollHeap * heap = (glob ? glob->getDefaultHeap() : 0); useLibhdfsScan_ = hdfsScanTdb.getUseLibhdfsScan(); - if (isSequenceFile() || hdfsScanTdb.isCompressedFile()) + if (isSequenceFile()) useLibhdfsScan_ = TRUE; lobGlob_ = NULL; hdfsScanBufMaxSize_ = hdfsScanTdb.hdfsBufSize_; @@ -569,6 +569,7 @@ ExWorkProcRetcode ExHdfsScanTcb::work() break; } hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), hdfsScanBuf_, hdfsScanBufMaxSize_, + hdfsScanTdb().hdfsIoByteArraySize_, &hdfsFileInfoListAsArray_, beginRangeNum_, numRanges_, hdfsScanTdb().rangeTailIOSize_, hdfsStats_, hdfsScanRetCode); if (hdfsScanRetCode != HDFS_SCAN_OK) { @@ -602,6 +603,11 @@ ExWorkProcRetcode ExHdfsScanTcb::work() break; } hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]); + if (retArray_[BYTES_COMPLETED] == 0) { + ex_assert(headRoomCopied_ == 0, "Internal Error in HdfsScan"); + step_ = TRAF_HDFS_READ; + break; + } bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED]; if (retArray_[RANGE_NO] != prevRangeNum_) { currRangeBytesRead_ = retArray_[BYTES_COMPLETED]; @@ -624,13 +630,9 @@ ExWorkProcRetcode ExHdfsScanTcb::work() extraBytesRead_ = currRangeBytesRead_ - hdfo->getBytesToRead(); else extraBytesRead_ = 0; + ex_assert(extraBytesRead_ >= 0, "Negative number of extraBytesRead"); // headRoom_ is the number of extra bytes to be read (rangeTailIOSize) // If the whole range fits in one buffer, it is needed to process rows till EOF for the last range alone. -/* - if (retArray_[IS_EOF] && (extraBytesRead_ < headRoom_) - && (retArray_[RANGE_NO] == (hdfsFileInfoListAsArray_.entries()-1))) - extraBytesRead_ = 0; -*/ if (numFiles_ <= 1) { if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_ && (retArray_[RANGE_NO] == (hdfsFileInfoListAsArray_.entries()-1))) extraBytesRead_ = 0; @@ -2048,7 +2050,7 @@ void ExHdfsScanTcb::computeRangesAtRuntime() } else e->bytesToRead_ = (Int64) fileInfos[h].mSize; - + e->compressionMethod_ = 0; hdfsFileInfoListAsArray_.insertAt(h, e); } } http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/executor/HdfsClient_JNI.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/HdfsClient_JNI.cpp b/core/sql/executor/HdfsClient_JNI.cpp index 0622b50..5b8e850 100644 --- a/core/sql/executor/HdfsClient_JNI.cpp +++ b/core/sql/executor/HdfsClient_JNI.cpp @@ -26,6 +26,7 @@ #include "Context.h" #include "jni.h" #include "HdfsClient_JNI.h" +#include "org_trafodion_sql_HDFSClient.h" // =========================================================================== // ===== Class HdfsScan @@ -83,7 +84,7 @@ HDFS_Scan_RetCode HdfsScan::init() JavaMethods_[JM_CTOR ].jm_name = "<init>"; JavaMethods_[JM_CTOR ].jm_signature = "()V"; JavaMethods_[JM_SET_SCAN_RANGES].jm_name = "setScanRanges"; - JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;[Ljava/lang/String;[J[J[I)V"; + JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;S[Ljava/lang/String;[J[J[I[S)V"; JavaMethods_[JM_TRAF_HDFS_READ].jm_name = "trafHdfsRead"; JavaMethods_[JM_TRAF_HDFS_READ].jm_signature = "()[I"; JavaMethods_[JM_STOP].jm_name = "stop"; @@ -106,7 +107,7 @@ char* HdfsScan::getErrorText(HDFS_Scan_RetCode errEnum) } ///////////////////////////////////////////////////////////////////////////// -HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, +HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, short hdfsIoByteArraySize, HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize) { QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::setScanRanges() called."); @@ -138,10 +139,12 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScan jenv_->PopLocalFrame(NULL); return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM; } + jshort j_hdfsIoByteArraySize = hdfsIoByteArraySize; jobjectArray j_filenames = NULL; jlongArray j_offsets = NULL; jlongArray j_lens = NULL; jintArray j_rangenums = NULL; + jshortArray j_compress = NULL; HdfsFileInfo *hdfo; jstring j_obj; @@ -184,7 +187,11 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScan return hdfsScanRetCode; } } - long len = hdfo->getBytesToRead()+rangeTailIOSize; + long len; + if (hdfo->getBytesToRead() > (LONG_MAX-rangeTailIOSize)) + len = LONG_MAX; + else + len = hdfo->getBytesToRead()+rangeTailIOSize; jenv_->SetLongArrayRegion(j_lens, rangeCount, 1, &len); if (j_rangenums == NULL) { @@ -196,12 +203,24 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScan } jint tdbRangeNum = i; jenv_->SetIntArrayRegion(j_rangenums, rangeCount, 1, &tdbRangeNum); + + if (j_compress == NULL) { + j_compress = jenv_->NewShortArray(numRanges); + if (jenv_->ExceptionCheck()) { + jenv_->PopLocalFrame(NULL); + return hdfsScanRetCode; + } + } + short compressionMethod = (short)hdfo->getCompressionMethod(); + //ex_assert(compressionMethod >= 0 && compressionMethod <= ComCompressionInfo::LZOP, "Illegal CompressionMethod Value"); + jenv_->SetShortArrayRegion(j_compress, rangeCount, 1, &compressionMethod); } if (hdfsStats_ != NULL) hdfsStats_->getHdfsTimer().start(); tsRecentJMFromJNI = JavaMethods_[JM_SET_SCAN_RANGES].jm_full_name; - jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, j_buf1, j_buf2, j_filenames, j_offsets, j_lens, j_rangenums); + jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, j_buf1, j_buf2, j_hdfsIoByteArraySize, + j_filenames, j_offsets, j_lens, j_rangenums, j_compress); if (hdfsStats_ != NULL) { hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); hdfsStats_->incHdfsCalls(); @@ -216,7 +235,7 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScan } HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, - HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize, + short hdfsIoByteArraySize, HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize, ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode) { QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::newInstance() called."); @@ -228,7 +247,7 @@ HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfs if (hdfsScan != NULL) { hdfsScanRetCode = hdfsScan->init(); if (hdfsScanRetCode == HDFS_SCAN_OK) - hdfsScanRetCode = hdfsScan->setScanRanges(hdfsScanBuf, scanBufSize, + hdfsScanRetCode = hdfsScan->setScanRanges(hdfsScanBuf, scanBufSize, hdfsIoByteArraySize, hdfsFileInfoArray, beginRangeNum, numRanges, rangeTailIOSize); if (hdfsScanRetCode == HDFS_SCAN_OK) hdfsScan->setHdfsStats(hdfsStats); @@ -359,7 +378,7 @@ void HdfsClient::deleteHdfsFileInfo() hdfsFileInfo_ = NULL; } -HdfsClient *HdfsClient::newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode &retCode) +HdfsClient *HdfsClient::newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode &retCode, short hdfsIoByteArraySize) { QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::newInstance() called."); @@ -369,8 +388,10 @@ HdfsClient *HdfsClient::newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HD HdfsClient *hdfsClient = new (heap) HdfsClient(heap); if (hdfsClient != NULL) { retCode = hdfsClient->init(); - if (retCode == HDFS_CLIENT_OK) + if (retCode == HDFS_CLIENT_OK) { hdfsClient->setHdfsStats(hdfsStats); + hdfsClient->setIoByteArraySize(hdfsIoByteArraySize); + } else { NADELETE(hdfsClient, HdfsClient, heap); hdfsClient = NULL; @@ -574,41 +595,50 @@ Int32 HdfsClient::hdfsWrite(const char* data, Int64 len, HDFS_Client_RetCode &hd hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION; return 0; } - - //Write the requisite bytes into the file - jbyteArray jbArray = jenv_->NewByteArray( len); - if (!jbArray) { - GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM)); - jenv_->PopLocalFrame(NULL); - hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM; - return 0; - } - jenv_->SetByteArrayRegion(jbArray, 0, len, (const jbyte*)data); - - if (hdfsStats_ != NULL) - hdfsStats_->getHdfsTimer().start(); - - tsRecentJMFromJNI = JavaMethods_[JM_HDFS_WRITE].jm_full_name; - // Java method returns the cumulative bytes written - jint totalBytesWritten = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_WRITE].methodID, jbArray); - - if (hdfsStats_ != NULL) { - hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); - hdfsStats_->incHdfsCalls(); - } - if (jenv_->ExceptionCheck()) + Int64 lenRemain = len; + Int64 writeLen; + Int64 chunkLen = (ioByteArraySize_ > 0 ? ioByteArraySize_ * 1024 : 0); + Int64 offset = 0; + do { - getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsWrite()"); - jenv_->PopLocalFrame(NULL); - hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION; - return 0; - } - + if ((chunkLen > 0) && (lenRemain > chunkLen)) + writeLen = chunkLen; + else + writeLen = lenRemain; + //Write the requisite bytes into the file + jbyteArray jbArray = jenv_->NewByteArray(writeLen); + if (!jbArray) { + GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM)); + jenv_->PopLocalFrame(NULL); + hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM; + return 0; + } + jenv_->SetByteArrayRegion(jbArray, 0, writeLen, (const jbyte*)(data+offset)); + + if (hdfsStats_ != NULL) + hdfsStats_->getHdfsTimer().start(); + + tsRecentJMFromJNI = JavaMethods_[JM_HDFS_WRITE].jm_full_name; + // Java method returns the cumulative bytes written + jint totalBytesWritten = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_WRITE].methodID, jbArray); + + if (hdfsStats_ != NULL) { + hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); + hdfsStats_->incHdfsCalls(); + } + if (jenv_->ExceptionCheck()) + { + getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsWrite()"); + jenv_->PopLocalFrame(NULL); + hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION; + return 0; + } + lenRemain -= writeLen; + offset += writeLen; + } while (lenRemain > 0); jenv_->PopLocalFrame(NULL); hdfsClientRetcode = HDFS_CLIENT_OK; - Int32 bytesWritten = totalBytesWritten - totalBytesWritten_; - totalBytesWritten_ = totalBytesWritten; - return bytesWritten; + return len; } Int32 HdfsClient::hdfsRead(const char* data, Int64 len, HDFS_Client_RetCode &hdfsClientRetcode) @@ -1018,6 +1048,22 @@ jint JNICALL Java_org_trafodion_sql_HDFSClient_sendFileStatus return (jint) retcode; } +JNIEXPORT jint JNICALL Java_org_trafodion_sql_HDFSClient_copyToByteBuffer + (JNIEnv *jenv, jobject j_obj, jobject j_buf, jint offset, jbyteArray j_bufArray, jint copyLen) +{ + void *bufBacking; + + bufBacking = jenv->GetDirectBufferAddress(j_buf); + if (bufBacking == NULL) + return -1; + jlong capacity = jenv->GetDirectBufferCapacity(j_buf); + jbyte *byteBufferAddr = (jbyte *)bufBacking + offset; + if ((offset + copyLen) > capacity) + return -2; + jenv->GetByteArrayRegion(j_bufArray, 0, copyLen, byteBufferAddr); + return 0; +} + #ifdef __cplusplus } #endif http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/executor/HdfsClient_JNI.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/HdfsClient_JNI.h b/core/sql/executor/HdfsClient_JNI.h index 6f68f4d..a85c590 100644 --- a/core/sql/executor/HdfsClient_JNI.h +++ b/core/sql/executor/HdfsClient_JNI.h @@ -66,11 +66,11 @@ public: // Get the error description. static char* getErrorText(HDFS_Scan_RetCode errEnum); - static HdfsScan *newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, + static HdfsScan *newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, short hdfsIoByteArraySize, HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize, ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode); - HDFS_Scan_RetCode setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, + HDFS_Scan_RetCode setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, short hdfsIoByteArraySize, HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize); @@ -169,9 +169,11 @@ public: } ~HdfsClient(); - static HdfsClient *newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode &retCode); + static HdfsClient *newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode &retCode, short hdfsIoByteArraySize = 0); static HdfsClient *getInstance(); static void deleteInstance(); + void setIoByteArraySize(short size) + { ioByteArraySize_ = size; } // Get the error description. static char* getErrorText(HDFS_Client_RetCode errEnum); @@ -224,6 +226,7 @@ private: int numFiles_; char *path_; Int64 totalBytesWritten_; + short ioByteArraySize_; ExHdfsScanStats *hdfsStats_; static jclass javaClass_; static JavaMethodInit* JavaMethods_; http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/executor/org_trafodion_sql_HDFSClient.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/org_trafodion_sql_HDFSClient.h b/core/sql/executor/org_trafodion_sql_HDFSClient.h new file mode 100644 index 0000000..6e3485e --- /dev/null +++ b/core/sql/executor/org_trafodion_sql_HDFSClient.h @@ -0,0 +1,31 @@ +/* DO NOT EDIT THIS FILE - it is machine generated */ +#include <jni.h> +/* Header for class org_trafodion_sql_HDFSClient */ + +#ifndef _Included_org_trafodion_sql_HDFSClient +#define _Included_org_trafodion_sql_HDFSClient +#ifdef __cplusplus +extern "C" { +#endif +#undef org_trafodion_sql_HDFSClient_UNCOMPRESSED +#define org_trafodion_sql_HDFSClient_UNCOMPRESSED 1L +/* + * Class: org_trafodion_sql_HDFSClient + * Method: copyToByteBuffer + * Signature: (Ljava/nio/ByteBuffer;I[BI)I + */ +JNIEXPORT jint JNICALL Java_org_trafodion_sql_HDFSClient_copyToByteBuffer + (JNIEnv *, jobject, jobject, jint, jbyteArray, jint); + +/* + * Class: org_trafodion_sql_HDFSClient + * Method: sendFileStatus + * Signature: (JIIZLjava/lang/String;JJSJLjava/lang/String;Ljava/lang/String;SJ)I + */ +JNIEXPORT jint JNICALL Java_org_trafodion_sql_HDFSClient_sendFileStatus + (JNIEnv *, jobject, jlong, jint, jint, jboolean, jstring, jlong, jlong, jshort, jlong, jstring, jstring, jshort, jlong); + +#ifdef __cplusplus +} +#endif +#endif http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/exp/ExpLOBinterface.h ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOBinterface.h b/core/sql/exp/ExpLOBinterface.h index 8194859..54435a3 100644 --- a/core/sql/exp/ExpLOBinterface.h +++ b/core/sql/exp/ExpLOBinterface.h @@ -29,6 +29,13 @@ class HdfsFileInfo { public: + HdfsFileInfo() { + entryNum_ = -1; + startOffset_ = -1; + bytesToRead_ = 0; + compressionMethod_ = 0; + flags_ = 0; + } char * fileName() { return fileName_; } // used for text/seq file access @@ -39,6 +46,8 @@ class HdfsFileInfo Int64 getStartRow() { return startOffset_; } Int64 getNumRows() { return bytesToRead_; } + Int16 getCompressionMethod() const { return compressionMethod_; } + Lng32 getFlags() { return flags_; } void setFileIsLocal(NABoolean v) @@ -64,6 +73,7 @@ class HdfsFileInfo NABasicPtr fileName_; Int64 startOffset_; Int64 bytesToRead_; + Int16 compressionMethod_; }; typedef HdfsFileInfo* HdfsFileInfoPtr; http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/generator/GenFastTransport.cpp ---------------------------------------------------------------------- diff --git a/core/sql/generator/GenFastTransport.cpp b/core/sql/generator/GenFastTransport.cpp index 75c1e0e..5019485 100644 --- a/core/sql/generator/GenFastTransport.cpp +++ b/core/sql/generator/GenFastTransport.cpp @@ -476,8 +476,12 @@ static short ft_codegen(Generator *generator, replication ); + UInt16 hdfsIoByteArraySize = (UInt16) + CmpCommon::getDefaultNumeric(HDFS_IO_INTERIM_BYTEARRAY_SIZE_IN_KB); + tdb->setHdfsIoByteArraySize(hdfsIoByteArraySize); tdb->setSequenceFile(isSequenceFile); tdb->setHdfsCompressed(CmpCommon::getDefaultNumeric(TRAF_UNLOAD_HDFS_COMPRESS)!=0); + if ((hiveNAColArray) && (hiveInsertErrMode == 2)) http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/generator/GenRelScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/generator/GenRelScan.cpp b/core/sql/generator/GenRelScan.cpp index 827b94c..0a133b2 100644 --- a/core/sql/generator/GenRelScan.cpp +++ b/core/sql/generator/GenRelScan.cpp @@ -1234,7 +1234,8 @@ if (hTabStats->isOrcFile()) if (hdfsBufSizeTesting) hdfsBufSize = hdfsBufSizeTesting; } - + UInt16 hdfsIoByteArraySize = (UInt16) + CmpCommon::getDefaultNumeric(HDFS_IO_INTERIM_BYTEARRAY_SIZE_IN_KB); UInt32 rangeTailIOSize = (UInt32) CmpCommon::getDefaultNumeric(HDFS_IO_RANGE_TAIL); if (rangeTailIOSize == 0) @@ -1362,7 +1363,7 @@ if (hTabStats->isOrcFile()) hdfsRootDir, modTS, numOfPartLevels, hdfsDirsToCheck ); - + hdfsscan_tdb->setHdfsIoByteArraySize(hdfsIoByteArraySize); generator->initTdbFields(hdfsscan_tdb); hdfsscan_tdb->setUseCursorMulti(useCursorMulti); http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/regress/hive/DIFF002.KNOWN ---------------------------------------------------------------------- diff --git a/core/sql/regress/hive/DIFF002.KNOWN b/core/sql/regress/hive/DIFF002.KNOWN new file mode 100644 index 0000000..24c6ed7 --- /dev/null +++ b/core/sql/regress/hive/DIFF002.KNOWN @@ -0,0 +1,14 @@ +359,360c359,362 +< (EXPR) +< ---------- +--- +> *** ERROR[8447] An error occurred during hdfs access. Error Detail: SETUP_HDFS_SCAN java.io.IOException: LZOP compression codec is not configured in Hadoop +> stackTraceRemoved +> stackTraceRemoved +> stackTraceRemoved +362,364c364 +< 73049 +< +< --- 1 row(s) selected. +--- +> --- 0 row(s) selected. http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/regress/hive/FILTER002 ---------------------------------------------------------------------- diff --git a/core/sql/regress/hive/FILTER002 b/core/sql/regress/hive/FILTER002 new file mode 100755 index 0000000..83f8fbf --- /dev/null +++ b/core/sql/regress/hive/FILTER002 @@ -0,0 +1,33 @@ +#! /bin/sh +# @@@ START COPYRIGHT @@@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# @@@ END COPYRIGHT @@@ + + +fil=$1 +if [ "$fil" = "" ]; then + echo "Usage: $0 filename" + exit 1 +fi + +fil=$1 +sed " +s/org.trafodion.sql.*/stackTraceRemoved/g +" $fil http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/sqlcomp/DefaultConstants.h ---------------------------------------------------------------------- diff --git a/core/sql/sqlcomp/DefaultConstants.h b/core/sql/sqlcomp/DefaultConstants.h index 0096424..b2b2bb9 100644 --- a/core/sql/sqlcomp/DefaultConstants.h +++ b/core/sql/sqlcomp/DefaultConstants.h @@ -3319,6 +3319,11 @@ enum DefaultConstants // This enum constant must be the LAST one in the list; it's a count, // not an Attribute (it's not IN DefaultDefaults; it's the SIZE of it)! + // Size of byte[] in java when direct byteBuffer can't be used + // Used to read compressed hdfs text files and to write + // both compressed and uncompressed hdfs files + HDFS_IO_INTERIM_BYTEARRAY_SIZE_IN_KB, + __NUM_DEFAULT_ATTRIBUTES }; http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/sqlcomp/nadefaults.cpp ---------------------------------------------------------------------- diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp index 1c47c54..7e5cfa1 100644 --- a/core/sql/sqlcomp/nadefaults.cpp +++ b/core/sql/sqlcomp/nadefaults.cpp @@ -1496,6 +1496,7 @@ SDDkwd__(EXE_DIAGNOSTIC_EVENTS, "OFF"), DDui1__(HDFS_IO_BUFFERSIZE, "65536"), DDui___(HDFS_IO_BUFFERSIZE_BYTES, "0"), + DDui___(HDFS_IO_INTERIM_BYTEARRAY_SIZE_IN_KB, "1024"), // The value 0 denotes RangeTail = max record length of table. DDui___(HDFS_IO_RANGE_TAIL, "0"), DDkwd__(HDFS_PREFETCH, "ON"), http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java index 0346bef..5ffcd03 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java +++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.FileNotFoundException; import java.io.EOFException; import java.io.OutputStream; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -38,6 +39,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.io.compress.CompressionInputStream; import java.io.EOFException; import java.util.concurrent.Callable; import java.util.concurrent.Future; @@ -53,31 +55,47 @@ import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.io.compress.CompressionCodecFactory; // // To read a range in a Hdfs file, use the constructor -// public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer buffer, long position, int length) throws IOException +// public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer buffer, long position, int length, CompressionInputStream inStream) // // For instance methods like hdfsListDirectory use the constructor // public HDFSClient() // // For all static methods use -// HdfsClient::<static_method_name> +// HDFSClient::<static_method_name> // public class HDFSClient { + // Keep the constants and string array below in sync with + // enum CompressionMethod at sql/comexe/ComCompressionInfo.h + static final short UNKNOWN_COMPRESSION = 0; + static final short UNCOMPRESSED = 1; + static final short LZOP = 5; + static final String COMPRESSION_TYPE[] = { + "UNKNOWN_COMPRESSION", // unable to determine compression method + "UNCOMPRESSED", // file is not compressed + "LZO_DEFLATE", // using LZO deflate compression + "DEFLATE", // using DEFLATE compression + "GZIP", // using GZIP compression + "LZOP"}; // using LZOP compression static Logger logger_ = Logger.getLogger(HDFSClient.class.getName()); private static Configuration config_ = null; private static ExecutorService executorService_ = null; private static FileSystem defaultFs_ = null; + private static CompressionCodecFactory codecFactory_ = null; private FileSystem fs_ = null; private int bufNo_; private int rangeNo_; - private FSDataInputStream fsdis_; + private FSDataInputStream fsdis_; + CompressionInputStream inStream_; private OutputStream outStream_; private String filename_; private ByteBuffer buf_; + private byte[] bufArray_; private int bufLen_; private int bufOffset_ = 0; private long pos_ = 0; @@ -89,7 +107,10 @@ public class HDFSClient private int isEOF_ = 0; private int totalBytesWritten_ = 0; private Path filepath_ = null; - private boolean compression_; + boolean compressed_ = false; + private CompressionCodec codec_ = null; + private short compressionType_; + private short ioByteArraySize_; static { String confFile = System.getProperty("trafodion.log4j.configFile"); System.setProperty("trafodion.root", System.getenv("TRAF_HOME")); @@ -105,6 +126,7 @@ public class HDFSClient catch (IOException ioe) { throw new RuntimeException("Exception in HDFSClient static block", ioe); } + codecFactory_ = new CompressionCodecFactory(config_); System.loadLibrary("executor"); } @@ -125,6 +147,9 @@ public class HDFSClient { int bytesRead; int totalBytesRead = 0; + if (compressed_) { + bufArray_ = new byte[ioByteArraySize_ * 1024]; + } else if (! buf_.hasArray()) { try { fsdis_.seek(pos_); @@ -135,10 +160,14 @@ public class HDFSClient } do { - if (buf_.hasArray()) - bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, lenRemain_); - else - bytesRead = fsdis_.read(buf_); + if (compressed_) { + bytesRead = compressedFileRead(lenRemain_); + } else { + if (buf_.hasArray()) + bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, lenRemain_); + else + bytesRead = fsdis_.read(buf_); + } if (bytesRead == -1) { isEOF_ = 1; break; @@ -151,10 +180,38 @@ public class HDFSClient bufOffset_ += bytesRead; pos_ += bytesRead; lenRemain_ -= bytesRead; - } while (lenRemain_ > 0); + } while (lenRemain_ > 0); return new Integer(totalBytesRead); } - } + } + + int compressedFileRead(int readLenRemain) throws IOException + { + int totalReadLen = 0; + int readLen; + int offset = 0; + int retcode; + + int lenRemain = ((readLenRemain > bufArray_.length) ? bufArray_.length : readLenRemain); + do + { + readLen = inStream_.read(bufArray_, offset, lenRemain); + if (readLen == -1 || readLen == 0) + break; + totalReadLen += readLen; + offset += readLen; + lenRemain -= readLen; + } while (lenRemain > 0); + if (totalReadLen > 0) { + if ((retcode = copyToByteBuffer(buf_, bufOffset_, bufArray_, totalReadLen)) != 0) + throw new IOException("Failure to copy to the DirectByteBuffer in the native layer with error code " + retcode); + } + else + totalReadLen = -1; + return totalReadLen; + } + + native int copyToByteBuffer(ByteBuffer buf, int bufOffset, byte[] bufArray, int copyLen); public HDFSClient() { @@ -166,14 +223,30 @@ public class HDFSClient // The passed in length can never be more than the size of the buffer // If the range has a length more than the buffer length, the range is chunked // in HdfsScan - public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer buffer, long position, int length) throws IOException + public HDFSClient(int bufNo, short ioByteArraySize, int rangeNo, String filename, ByteBuffer buffer, long position, + int length, short compressionType, CompressionInputStream inStream) throws IOException { bufNo_ = bufNo; rangeNo_ = rangeNo; filename_ = filename; + ioByteArraySize_ = ioByteArraySize; filepath_ = new Path(filename_); fs_ = FileSystem.get(filepath_.toUri(),config_); - fsdis_ = fs_.open(filepath_); + compressionType_ = compressionType; + inStream_ = inStream; + codec_ = codecFactory_.getCodec(filepath_); + if (codec_ != null) { + compressed_ = true; + if (inStream_ == null) + inStream_ = codec_.createInputStream(fs_.open(filepath_)); + } + else { + if ((compressionType_ != UNCOMPRESSED) && (compressionType_ != UNKNOWN_COMPRESSION)) + throw new IOException(COMPRESSION_TYPE[compressionType_] + " compression codec is not configured in Hadoop"); + if (filename_.endsWith(".lzo")) + throw new IOException(COMPRESSION_TYPE[LZOP] + " compression codec is not configured in Hadoop"); + fsdis_ = fs_.open(filepath_); + } blockSize_ = (int)fs_.getDefaultBlockSize(filepath_); buf_ = buffer; bufOffset_ = 0; @@ -201,7 +274,8 @@ public class HDFSClient int bytesRead; retObject = (Integer)future_.get(); bytesRead = retObject.intValue(); - fsdis_.close(); + if (! compressed_) + fsdis_.close(); fsdis_ = null; return bytesRead; } @@ -226,7 +300,7 @@ public class HDFSClient filepath_ = new Path(fname + ".gz"); fs_ = FileSystem.get(filepath_.toUri(),config_); - compression_ = compress; + compressed_ = compress; fsdis_ = null; FSDataOutputStream fsOut; if (overwrite) @@ -237,7 +311,7 @@ public class HDFSClient else fsOut = fs_.create(filepath_); - if (compression_) { + if (compressed_) { GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, config_); Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec); outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor); @@ -256,7 +330,7 @@ public class HDFSClient else filepath_ = new Path(fname + ".gz"); fs_ = FileSystem.get(filepath_.toUri(),config_); - compression_ = compress; + compressed_ = compress; outStream_ = null; fsdis_ = null; return true; @@ -274,7 +348,7 @@ public class HDFSClient else fsOut = fs_.create(filepath_); - if (compression_) { + if (compressed_) { GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, config_); Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec); outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor); @@ -285,21 +359,23 @@ public class HDFSClient logger_.debug("HDFSClient.hdfsWrite() - output stream created" ); } outStream_.write(buff); - if (outStream_ instanceof FSDataOutputStream) - totalBytesWritten_ = ((FSDataOutputStream)outStream_).size(); - else - totalBytesWritten_ += buff.length; if (logger_.isDebugEnabled()) - logger_.debug("HDFSClient.hdfsWrite() - bytes written " + totalBytesWritten_ ); - return totalBytesWritten_; + logger_.debug("HDFSClient.hdfsWrite() - bytes written " + buff.length); + return buff.length; } int hdfsRead(ByteBuffer buffer) throws IOException { if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsRead() - started" ); - if (fsdis_ == null) { - fsdis_ = fs_.open(filepath_); + if (fsdis_ == null && inStream_ == null ) { + codec_ = codecFactory_.getCodec(filepath_); + if (codec_ != null) { + compressed_ = true; + inStream_ = codec_.createInputStream(fs_.open(filepath_)); + } + else + fsdis_ = fs_.open(filepath_); pos_ = 0; } int lenRemain; @@ -307,6 +383,8 @@ public class HDFSClient int totalBytesRead = 0; int bufLen; int bufOffset = 0; + if (compressed_ && bufArray_ != null) + bufArray_ = new byte[ioByteArraySize_ * 1024]; if (buffer.hasArray()) bufLen = buffer.array().length; else @@ -314,10 +392,14 @@ public class HDFSClient lenRemain = bufLen; do { - if (buffer.hasArray()) - bytesRead = fsdis_.read(pos_, buffer.array(), bufOffset, lenRemain); - else - bytesRead = fsdis_.read(buffer); + if (compressed_) { + bytesRead = compressedFileRead(lenRemain); + } else { + if (buffer.hasArray()) + bytesRead = fsdis_.read(pos_, buffer.array(), bufOffset, lenRemain); + else + bytesRead = fsdis_.read(buffer); + } if (bytesRead == -1 || bytesRead == 0) break; totalBytesRead += bytesRead; http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java index 99f021d..b438fb2 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java +++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.io.compress.CompressionInputStream; import java.net.URI; // This class implements an efficient mechanism to read hdfs files @@ -72,6 +73,9 @@ public class HdfsScan private long currRangeLenRemain_; private int lastBufCompleted_ = -1; private boolean scanCompleted_; + private CompressionInputStream currInStream_; + private short ioByteArraySize_; + // Structure to hold the Scan ranges for this HdfsScan instance // @@ -82,13 +86,15 @@ public class HdfsScan long pos_; long len_; int tdbRangeNum_; + short compressionType_; - HdfsScanRange(String filename, long pos, long len, int tdbRangeNum) + HdfsScanRange(String filename, long pos, long len, int tdbRangeNum, short compressionType) { filename_ = filename; pos_ = pos; len_ = len; tdbRangeNum_ = tdbRangeNum; + compressionType_ = compressionType; } } @@ -103,7 +109,8 @@ public class HdfsScan { } - public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String filename[], long pos[], long len[], int rangeNum[]) throws IOException + public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, short ioByteArraySize, String filename[], long pos[], + long len[], int rangeNum[], short compressionType[]) throws IOException { // Two buffers to hold the data read buf_ = new ByteBuffer[2]; @@ -111,7 +118,7 @@ public class HdfsScan buf_[0] = buf1; buf_[1] = buf2; - + ioByteArraySize_ = ioByteArraySize; for (int i = 0; i < 2 ; i++) { if (buf_[i].hasArray()) bufLen_[i] = buf_[i].array().length; @@ -121,12 +128,13 @@ public class HdfsScan hdfsClient_ = new HDFSClient[2]; hdfsScanRanges_ = new HdfsScanRange[filename.length]; for (int i = 0; i < filename.length; i++) { - hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], len[i], rangeNum[i]); + hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], len[i], rangeNum[i], compressionType[i]); } if (hdfsScanRanges_.length > 0) { currRange_ = 0; currRangePos_ = hdfsScanRanges_[currRange_].pos_; currRangeLenRemain_ = hdfsScanRanges_[currRange_].len_; + currInStream_ = null; scheduleHdfsScanRange(0, 0); } scanCompleted_ = false; @@ -146,6 +154,9 @@ public class HdfsScan currRange_++; currRangePos_ = hdfsScanRanges_[currRange_].pos_; currRangeLenRemain_ = hdfsScanRanges_[currRange_].len_; + if (currInStream_ != null) + currInStream_.close(); + currInStream_ = null; } } if (currRangeLenRemain_ > bufLen_[bufNo]) @@ -155,7 +166,11 @@ public class HdfsScan if (! scanCompleted_) { if (logger_.isDebugEnabled()) logger_.debug(" CurrentRange " + hdfsScanRanges_[currRange_].tdbRangeNum_ + " LenRemain " + currRangeLenRemain_ + " BufNo " + bufNo); - hdfsClient_[bufNo] = new HDFSClient(bufNo, hdfsScanRanges_[currRange_].tdbRangeNum_, hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currRangePos_, readLength); + hdfsClient_[bufNo] = new HDFSClient(bufNo, ioByteArraySize_, hdfsScanRanges_[currRange_].tdbRangeNum_, + hdfsScanRanges_[currRange_].filename_, + buf_[bufNo], currRangePos_, readLength, + hdfsScanRanges_[currRange_].compressionType_, currInStream_); + } } @@ -188,6 +203,7 @@ public class HdfsScan bufNo = 0; rangeNo = hdfsClient_[0].getRangeNo(); isEOF = hdfsClient_[0].isEOF(); + currInStream_ = hdfsClient_[0].inStream_; break; case 0: // Wait for the read to complete in buffer 1 @@ -195,6 +211,7 @@ public class HdfsScan bufNo = 1; rangeNo = hdfsClient_[1].getRangeNo(); isEOF = hdfsClient_[1].isEOF(); + currInStream_ = hdfsClient_[1].inStream_; break; default: bufNo = -1; @@ -218,6 +235,9 @@ public class HdfsScan currRangePos_ = hdfsScanRanges_[currRange_].pos_; currRangeLenRemain_ = hdfsScanRanges_[currRange_].len_; bytesRead = 0; + if (currInStream_ != null) + currInStream_.close(); + currInStream_ = null; } } switch (lastBufCompleted_) @@ -278,6 +298,7 @@ public class HdfsScan long pos[] = new long[file_status.length * split]; long len[] = new long[file_status.length * split]; int range[] = new int[file_status.length * split]; + short compress[] = new short[file_status.length * split]; for (int i = 0 ; i < file_status.length * split; i++) { Path filePath = file_status[i].getPath(); long fileLen = file_status[i].getLen(); @@ -293,13 +314,14 @@ public class HdfsScan range[i] = i; if (j == (split-1)) len[i] = fileLen - (splitLen *(j)); + compress[i] = 1; // Uncompressed System.out.println ("Range " + i + " Pos " + pos[i] + " Length " + len[i]); i++; } } long time1 = System.currentTimeMillis(); HdfsScan hdfsScan = new HdfsScan(); - hdfsScan.setScanRanges(buf1, buf2, fileName, pos, len, range); + hdfsScan.setScanRanges(buf1, buf2, (short)512, fileName, pos, len, range, compress); int[] retArray; int bytesCompleted; ByteBuffer buf;
