Repository: trafodion Updated Branches: refs/heads/master 9c59d7803 -> c8ffae3b4
[TRAFODION-3171] Refactor Hive sequence file reading to use the new implementation Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/6165e6b1 Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/6165e6b1 Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/6165e6b1 Branch: refs/heads/master Commit: 6165e6b1017521abed062c55931d4555b886387d Parents: 95eaa31 Author: selvaganesang <[email protected]> Authored: Wed Aug 1 20:08:46 2018 +0000 Committer: selvaganesang <[email protected]> Committed: Wed Aug 1 20:08:46 2018 +0000 ---------------------------------------------------------------------- core/sql/executor/ExHdfsScan.cpp | 3 +- core/sql/executor/HdfsClient_JNI.cpp | 12 +- core/sql/executor/HdfsClient_JNI.h | 6 +- .../main/java/org/trafodion/sql/HDFSClient.java | 116 ++++++++++++++++--- .../main/java/org/trafodion/sql/HdfsScan.java | 24 ++-- 5 files changed, 129 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/6165e6b1/core/sql/executor/ExHdfsScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp index b09cffd..d4cf717 100644 --- a/core/sql/executor/ExHdfsScan.cpp +++ b/core/sql/executor/ExHdfsScan.cpp @@ -130,8 +130,6 @@ ExHdfsScanTcb::ExHdfsScanTcb( Space * space = (glob ? glob->getSpace() : 0); CollHeap * heap = (glob ? glob->getDefaultHeap() : 0); useLibhdfsScan_ = hdfsScanTdb.getUseLibhdfsScan(); - if (isSequenceFile()) - useLibhdfsScan_ = TRUE; lobGlob_ = NULL; hdfsScanBufMaxSize_ = hdfsScanTdb.hdfsBufSize_; headRoom_ = (Int32)hdfsScanTdb.rangeTailIOSize_; @@ -571,6 +569,7 @@ ExWorkProcRetcode ExHdfsScanTcb::work() hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), hdfsScanBuf_, hdfsScanBufMaxSize_, hdfsScanTdb().hdfsIoByteArraySizeInKB_, &hdfsFileInfoListAsArray_, beginRangeNum_, numRanges_, hdfsScanTdb().rangeTailIOSize_, + isSequenceFile(), hdfsScanTdb().recordDelimiter_, hdfsStats_, hdfsScanRetCode); if (hdfsScanRetCode != HDFS_SCAN_OK) { setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, "SETUP_HDFS_SCAN", http://git-wip-us.apache.org/repos/asf/trafodion/blob/6165e6b1/core/sql/executor/HdfsClient_JNI.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/HdfsClient_JNI.cpp b/core/sql/executor/HdfsClient_JNI.cpp index 65c83cd..5ae2805 100644 --- a/core/sql/executor/HdfsClient_JNI.cpp +++ b/core/sql/executor/HdfsClient_JNI.cpp @@ -85,7 +85,7 @@ HDFS_Scan_RetCode HdfsScan::init() JavaMethods_[JM_CTOR ].jm_name = "<init>"; JavaMethods_[JM_CTOR ].jm_signature = "()V"; JavaMethods_[JM_SET_SCAN_RANGES].jm_name = "setScanRanges"; - JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;I[Ljava/lang/String;[J[J[I[S)V"; + JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;I[Ljava/lang/String;[J[J[I[SZB)V"; JavaMethods_[JM_TRAF_HDFS_READ].jm_name = "trafHdfsRead"; JavaMethods_[JM_TRAF_HDFS_READ].jm_signature = "()[I"; JavaMethods_[JM_STOP].jm_name = "stop"; @@ -109,7 +109,7 @@ char* HdfsScan::getErrorText(HDFS_Scan_RetCode errEnum) ///////////////////////////////////////////////////////////////////////////// HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, int hdfsIoByteArraySizeInKB, - HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize) + HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize, NABoolean sequenceFile, char recDelimiter) { QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::setScanRanges() called."); @@ -216,12 +216,13 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScan ex_assert(compressionMethod >= 0 && compressionMethod < ComCompressionInfo::SUPPORTED_COMPRESSIONS, "Illegal CompressionMethod Value"); jenv_->SetShortArrayRegion(j_compress, rangeCount, 1, &compressionMethod); } - + jboolean j_sequenceFile = sequenceFile; + jbyte j_recDelimiter = (BYTE)recDelimiter; if (hdfsStats_ != NULL) hdfsStats_->getHdfsTimer().start(); tsRecentJMFromJNI = JavaMethods_[JM_SET_SCAN_RANGES].jm_full_name; jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, j_buf1, j_buf2, j_hdfsIoByteArraySizeInKB, - j_filenames, j_offsets, j_lens, j_rangenums, j_compress); + j_filenames, j_offsets, j_lens, j_rangenums, j_compress, j_sequenceFile, j_recDelimiter); if (hdfsStats_ != NULL) { hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); hdfsStats_->incHdfsCalls(); @@ -237,6 +238,7 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScan HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, int hdfsIoByteArraySizeInKB, HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize, + NABoolean sequenceFile, char recDelimiter, ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode) { QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::newInstance() called."); @@ -249,7 +251,7 @@ HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfs hdfsScanRetCode = hdfsScan->init(); if (hdfsScanRetCode == HDFS_SCAN_OK) hdfsScanRetCode = hdfsScan->setScanRanges(hdfsScanBuf, scanBufSize, hdfsIoByteArraySizeInKB, - hdfsFileInfoArray, beginRangeNum, numRanges, rangeTailIOSize); + hdfsFileInfoArray, beginRangeNum, numRanges, rangeTailIOSize, sequenceFile, recDelimiter); if (hdfsScanRetCode == HDFS_SCAN_OK) hdfsScan->setHdfsStats(hdfsStats); else { http://git-wip-us.apache.org/repos/asf/trafodion/blob/6165e6b1/core/sql/executor/HdfsClient_JNI.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/HdfsClient_JNI.h b/core/sql/executor/HdfsClient_JNI.h index 3d12633..c3e6518 100644 --- a/core/sql/executor/HdfsClient_JNI.h +++ b/core/sql/executor/HdfsClient_JNI.h @@ -66,12 +66,12 @@ public: static char* getErrorText(HDFS_Scan_RetCode errEnum); static HdfsScan *newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, int hdfsIoByteArraySizeInKB, - HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize, - ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode); + HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize, NABoolean sequenceFile, + char recDelimiter, ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode); HDFS_Scan_RetCode setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, int hdfsIoByteArraySizeInKB, HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, - int rangeTailIOSize); + int rangeTailIOSize, NABoolean sequenceFile, char recDelimiter); HDFS_Scan_RetCode trafHdfsRead(int retArray[], short arrayLen); http://git-wip-us.apache.org/repos/asf/trafodion/blob/6165e6b1/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java index d4a697f..8a14b61 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java +++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java @@ -40,6 +40,11 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; + import java.io.EOFException; import java.util.concurrent.Callable; import java.util.concurrent.Future; @@ -113,6 +118,12 @@ public class HDFSClient private CompressionCodec codec_ = null; private short compressionType_; private int ioByteArraySizeInKB_; + + private boolean sequenceFile_; + private byte recDelimiter_; + private Writable key_; + private Writable value_; + private SequenceFile.Reader reader_; static { String confFile = System.getProperty("trafodion.log4j.configFile"); System.setProperty("trafodion.root", System.getenv("TRAF_HOME")); @@ -159,7 +170,9 @@ public class HDFSClient { int bytesRead; int totalBytesRead = 0; - if (compressed_) { + if (sequenceFile_) { + reader_.sync(pos_); + } else if (compressed_) { bufArray_ = new byte[ioByteArraySizeInKB_ * 1024]; } else { @@ -185,7 +198,10 @@ public class HDFSClient { if (compressed_) { bytesRead = compressedFileRead(lenRemain_); - } else { + } else if (sequenceFile_) { + bytesRead = sequenceFileRead(lenRemain_); + } + else { if (buf_.hasArray()) bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, lenRemain_); else @@ -240,7 +256,48 @@ public class HDFSClient return totalReadLen; } - native int copyToByteBuffer(ByteBuffer buf, int bufOffset, byte[] bufArray, int copyLen); + /* Trafodion adds record delimiter '\n' while copying it + to buffer backing up the ByteBuffer */ + + int sequenceFileRead(int readLenRemain) throws IOException + { + boolean eof = false; + byte[] byteArray; + int readLen; + int totalReadLen = 0; + long tempPos; + int lenRemain = readLenRemain; + + while (!eof && lenRemain > 0) { + tempPos = reader_.getPosition(); + try { + eof = reader_.next(key_, value_); + } + catch (java.io.EOFException e) + { + eof = true; + break; + } + byteArray = ((Text)value_).getBytes(); + readLen = ((Text)value_).getLength(); + if (readLen <= lenRemain) { + + buf_.put(byteArray, 0, readLen); + buf_.put(recDelimiter_); + lenRemain_ -= (readLen+1); + totalReadLen += (readLen+1); + } else { + // Reset the position because the row can't be copied to buffer + reader_.sync(tempPos); + break; + } + } + if (totalReadLen == 0) + totalReadLen = -1; + return totalReadLen; + } + + native int copyToByteBuffer(ByteBuffer buf, int bufOffset, byte[] bufArray, int copyLen); public HDFSClient() { @@ -253,7 +310,7 @@ public class HDFSClient // If the range has a length more than the buffer length, the range is chunked // in HdfsScan public HDFSClient(int bufNo, int ioByteArraySizeInKB, int rangeNo, String filename, ByteBuffer buffer, long position, - int length, short compressionType, CompressionInputStream inStream) throws IOException + int length, short compressionType, boolean sequenceFile, byte recDelimiter, CompressionInputStream inStream) throws IOException { bufNo_ = bufNo; rangeNo_ = rangeNo; @@ -263,18 +320,24 @@ public class HDFSClient fs_ = FileSystem.get(filepath_.toUri(),config_); compressionType_ = compressionType; inStream_ = inStream; - codec_ = codecFactory_.getCodec(filepath_); - if (codec_ != null) { - compressed_ = true; - if (inStream_ == null) - inStream_ = codec_.createInputStream(fs_.open(filepath_)); - } + sequenceFile_ = sequenceFile; + recDelimiter_ = recDelimiter; + if (sequenceFile_) + fsdis_ = fs_.open(filepath_); else { - if ((compressionType_ != UNCOMPRESSED) && (compressionType_ != UNKNOWN_COMPRESSION)) - throw new IOException(COMPRESSION_TYPE[compressionType_] + " compression codec is not configured in Hadoop"); - if (filename_.endsWith(".lzo")) - throw new IOException(COMPRESSION_TYPE[LZOP] + " compression codec is not configured in Hadoop"); - fsdis_ = fs_.open(filepath_); + codec_ = codecFactory_.getCodec(filepath_); + if (codec_ != null) { + compressed_ = true; + if (inStream_ == null) + inStream_ = codec_.createInputStream(fs_.open(filepath_)); + } + else { + if ((compressionType_ != UNCOMPRESSED) && (compressionType_ != UNKNOWN_COMPRESSION)) + throw new IOException(COMPRESSION_TYPE[compressionType_] + " compression codec is not configured in Hadoop"); + if (filename_.endsWith(".lzo")) + throw new IOException(COMPRESSION_TYPE[LZOP] + " compression codec is not configured in Hadoop"); + fsdis_ = fs_.open(filepath_); + } } blockSize_ = (int)fs_.getDefaultBlockSize(filepath_); buf_ = buffer; @@ -289,10 +352,33 @@ public class HDFSClient } lenRemain_ = (len_ > bufLen_) ? bufLen_ : len_; if (lenRemain_ != 0) { + if (sequenceFile_) + initSequenceFileRead(); future_ = executorService_.submit(new HDFSRead()); } } + /* Trafodion support Sequence file with keys written via ByteWritble or Text class + and value written via Text class. However, the key is completely ignored + while reading the rows. The columns in the value is delimited by column delimiter 001(octal). + */ + + public void initSequenceFileRead() throws IOException + { + SequenceFile.Reader.Option seqPos = SequenceFile.Reader.start(pos_); + SequenceFile.Reader.Option seqLen = SequenceFile.Reader.length(lenRemain_); + SequenceFile.Reader.Option seqInputStream = SequenceFile.Reader.stream(fsdis_); + reader_ = new SequenceFile.Reader(config_, seqPos, seqLen, seqInputStream); + String keyClass = reader_.getKeyClassName(); + String valueClass = reader_.getValueClassName(); + if (! valueClass.equals("org.apache.hadoop.io.Text")) + throw new IOException("Sequence File with the value class of type " + valueClass + " is not supported"); + if (!(keyClass.equals("org.apache.hadoop.io.Text") || keyClass.equals("org.apache.hadoop.io.BytesWritable"))) + throw new IOException("Sequence File with the key class of type " + keyClass + " is not supported"); + key_ = (Writable) ReflectionUtils.newInstance(reader_.getKeyClass(), config_); + value_ = (Writable) ReflectionUtils.newInstance(reader_.getValueClass(), config_); + } + // This method waits for the read to complete. Read can complete due to one of the following // a) buffer is full // b) EOF is reached http://git-wip-us.apache.org/repos/asf/trafodion/blob/6165e6b1/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java index 48d5768..3ef1be7 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java +++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java @@ -75,7 +75,8 @@ public class HdfsScan private boolean scanCompleted_; private CompressionInputStream currInStream_; private int ioByteArraySizeInKB_; - + private boolean sequenceFile_; + private byte recDelimiter_; // Structure to hold the Scan ranges for this HdfsScan instance // @@ -109,7 +110,7 @@ public class HdfsScan } public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, int ioByteArraySizeInKB, String filename[], long pos[], - long len[], int rangeNum[], short compressionType[]) throws IOException + long len[], int rangeNum[], short compressionType[], boolean sequenceFile, byte recDelimiter) throws IOException { // Two buffers to hold the data read buf_ = new ByteBuffer[2]; @@ -129,6 +130,9 @@ public class HdfsScan for (int i = 0; i < filename.length; i++) { hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], len[i], rangeNum[i], compressionType[i]); } + sequenceFile_ = sequenceFile; + recDelimiter_ = recDelimiter; + scanCompleted_ = false; if (hdfsScanRanges_.length > 0) { currRange_ = 0; currRangePos_ = hdfsScanRanges_[currRange_].pos_; @@ -136,7 +140,6 @@ public class HdfsScan currInStream_ = null; scheduleHdfsScanRange(0, 0); } - scanCompleted_ = false; } public void scheduleHdfsScanRange(int bufNo, int bytesCompleted) throws IOException @@ -168,7 +171,7 @@ public class HdfsScan hdfsClient_[bufNo] = new HDFSClient(bufNo, ioByteArraySizeInKB_, hdfsScanRanges_[currRange_].tdbRangeNum_, hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currRangePos_, readLength, - hdfsScanRanges_[currRange_].compressionType_, currInStream_); + hdfsScanRanges_[currRange_].compressionType_, sequenceFile_, recDelimiter_, currInStream_); } } @@ -287,6 +290,7 @@ public class HdfsScan Table table = hiveMeta.getTable(tableName); StorageDescriptor sd = table.getSd(); String location = sd.getLocation(); + String inputFormat = sd.getInputFormat(); URI uri = new URI(location); Path path = new Path(uri); FileSystem fs = FileSystem.get(config); @@ -305,7 +309,7 @@ public class HdfsScan fileName[i] = filePath.toString(); System.out.println (" fileName " + fileName[i] + " Length " + fileLen); long splitPos = 0; - for (int j = 0 ; j < split ; j++) + for (int j = 0 ; j < split; j++) { fileName[i] = filePath.toString(); pos[i] = splitPos + (splitLen * j); @@ -315,12 +319,18 @@ public class HdfsScan len[i] = fileLen - (splitLen *(j)); compress[i] = 1; // Uncompressed System.out.println ("Range " + i + " Pos " + pos[i] + " Length " + len[i]); - i++; + if (j != (split-1)) + i++; } } long time1 = System.currentTimeMillis(); + System.out.println("Input format " + inputFormat); + boolean sequenceFile = false; + if (inputFormat.indexOf(new String("SequenceFileInputFormat")) != -1) + sequenceFile = true; HdfsScan hdfsScan = new HdfsScan(); - hdfsScan.setScanRanges(buf1, buf2, (short)512, fileName, pos, len, range, compress); + byte b = '\n'; + hdfsScan.setScanRanges(buf1, buf2, (short)512, fileName, pos, len, range, compress, sequenceFile, b); int[] retArray; int bytesCompleted; ByteBuffer buf;
