Repository: trafodion
Updated Branches:
  refs/heads/master 9c59d7803 -> c8ffae3b4


[TRAFODION-3171] Refactor Hive sequence file reading to use the new 
implementation


Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/6165e6b1
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/6165e6b1
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/6165e6b1

Branch: refs/heads/master
Commit: 6165e6b1017521abed062c55931d4555b886387d
Parents: 95eaa31
Author: selvaganesang <[email protected]>
Authored: Wed Aug 1 20:08:46 2018 +0000
Committer: selvaganesang <[email protected]>
Committed: Wed Aug 1 20:08:46 2018 +0000

----------------------------------------------------------------------
 core/sql/executor/ExHdfsScan.cpp                |   3 +-
 core/sql/executor/HdfsClient_JNI.cpp            |  12 +-
 core/sql/executor/HdfsClient_JNI.h              |   6 +-
 .../main/java/org/trafodion/sql/HDFSClient.java | 116 ++++++++++++++++---
 .../main/java/org/trafodion/sql/HdfsScan.java   |  24 ++--
 5 files changed, 129 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafodion/blob/6165e6b1/core/sql/executor/ExHdfsScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp
index b09cffd..d4cf717 100644
--- a/core/sql/executor/ExHdfsScan.cpp
+++ b/core/sql/executor/ExHdfsScan.cpp
@@ -130,8 +130,6 @@ ExHdfsScanTcb::ExHdfsScanTcb(
   Space * space = (glob ? glob->getSpace() : 0);
   CollHeap * heap = (glob ? glob->getDefaultHeap() : 0);
   useLibhdfsScan_ = hdfsScanTdb.getUseLibhdfsScan();
-  if (isSequenceFile())
-     useLibhdfsScan_ = TRUE;
   lobGlob_ = NULL;
   hdfsScanBufMaxSize_ = hdfsScanTdb.hdfsBufSize_;
   headRoom_ = (Int32)hdfsScanTdb.rangeTailIOSize_;
@@ -571,6 +569,7 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
              hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), 
hdfsScanBuf_, hdfsScanBufMaxSize_, 
                             hdfsScanTdb().hdfsIoByteArraySizeInKB_, 
                             &hdfsFileInfoListAsArray_, beginRangeNum_, 
numRanges_, hdfsScanTdb().rangeTailIOSize_, 
+                            isSequenceFile(), hdfsScanTdb().recordDelimiter_, 
                             hdfsStats_, hdfsScanRetCode);
              if (hdfsScanRetCode != HDFS_SCAN_OK) {
                 setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, 
"SETUP_HDFS_SCAN", 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/6165e6b1/core/sql/executor/HdfsClient_JNI.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/HdfsClient_JNI.cpp 
b/core/sql/executor/HdfsClient_JNI.cpp
index 65c83cd..5ae2805 100644
--- a/core/sql/executor/HdfsClient_JNI.cpp
+++ b/core/sql/executor/HdfsClient_JNI.cpp
@@ -85,7 +85,7 @@ HDFS_Scan_RetCode HdfsScan::init()
     JavaMethods_[JM_CTOR      ].jm_name      = "<init>";
     JavaMethods_[JM_CTOR      ].jm_signature = "()V";
     JavaMethods_[JM_SET_SCAN_RANGES].jm_name      = "setScanRanges";
-    JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = 
"(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;I[Ljava/lang/String;[J[J[I[S)V";
+    JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = 
"(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;I[Ljava/lang/String;[J[J[I[SZB)V";
     JavaMethods_[JM_TRAF_HDFS_READ].jm_name      = "trafHdfsRead";
     JavaMethods_[JM_TRAF_HDFS_READ].jm_signature = "()[I";
     JavaMethods_[JM_STOP].jm_name      = "stop";
@@ -109,7 +109,7 @@ char* HdfsScan::getErrorText(HDFS_Scan_RetCode errEnum)
 
 /////////////////////////////////////////////////////////////////////////////
 HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF 
*hdfsScanBuf,  int scanBufSize, int hdfsIoByteArraySizeInKB,
-      HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 
numRanges, int rangeTailIOSize)
+      HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 
numRanges, int rangeTailIOSize, NABoolean sequenceFile, char recDelimiter)
 {
    QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::setScanRanges() called.");
 
@@ -216,12 +216,13 @@ HDFS_Scan_RetCode 
HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScan
        ex_assert(compressionMethod >= 0 && compressionMethod < 
ComCompressionInfo::SUPPORTED_COMPRESSIONS, "Illegal CompressionMethod Value");
        jenv_->SetShortArrayRegion(j_compress, rangeCount, 1, 
&compressionMethod);
    } 
-
+   jboolean j_sequenceFile = sequenceFile;
+   jbyte j_recDelimiter = (BYTE)recDelimiter;
    if (hdfsStats_ != NULL)
        hdfsStats_->getHdfsTimer().start();
    tsRecentJMFromJNI = JavaMethods_[JM_SET_SCAN_RANGES].jm_full_name;
    jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, 
j_buf1, j_buf2, j_hdfsIoByteArraySizeInKB, 
-                      j_filenames, j_offsets, j_lens, j_rangenums, j_compress);
+                      j_filenames, j_offsets, j_lens, j_rangenums, j_compress, 
j_sequenceFile, j_recDelimiter);
    if (hdfsStats_ != NULL) {
       hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
       hdfsStats_->incHdfsCalls();
@@ -237,6 +238,7 @@ HDFS_Scan_RetCode 
HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScan
 
 HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF 
*hdfsScanBuf,  int scanBufSize,
       int hdfsIoByteArraySizeInKB, HdfsFileInfoArray *hdfsFileInfoArray, Int32 
beginRangeNum, Int32 numRanges, int rangeTailIOSize, 
+      NABoolean sequenceFile, char recDelimiter,  
       ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode)
 {
    QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::newInstance() called.");
@@ -249,7 +251,7 @@ HdfsScan *HdfsScan::newInstance(NAHeap *heap, 
ExHdfsScanTcb::HDFS_SCAN_BUF *hdfs
        hdfsScanRetCode = hdfsScan->init();
        if (hdfsScanRetCode == HDFS_SCAN_OK) 
           hdfsScanRetCode = hdfsScan->setScanRanges(hdfsScanBuf, scanBufSize, 
hdfsIoByteArraySizeInKB,  
-                    hdfsFileInfoArray, beginRangeNum, numRanges, 
rangeTailIOSize); 
+                    hdfsFileInfoArray, beginRangeNum, numRanges, 
rangeTailIOSize, sequenceFile, recDelimiter); 
        if (hdfsScanRetCode == HDFS_SCAN_OK) 
           hdfsScan->setHdfsStats(hdfsStats);
        else {

http://git-wip-us.apache.org/repos/asf/trafodion/blob/6165e6b1/core/sql/executor/HdfsClient_JNI.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/HdfsClient_JNI.h 
b/core/sql/executor/HdfsClient_JNI.h
index 3d12633..c3e6518 100644
--- a/core/sql/executor/HdfsClient_JNI.h
+++ b/core/sql/executor/HdfsClient_JNI.h
@@ -66,12 +66,12 @@ public:
   static char* getErrorText(HDFS_Scan_RetCode errEnum);
 
   static HdfsScan *newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF 
*hdfsScanBuf, int scanBufSize, int hdfsIoByteArraySizeInKB, 
-            HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 
numRanges, int rangeTailIOSize,
-            ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode);
+            HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 
numRanges, int rangeTailIOSize, NABoolean sequenceFile,
+            char recDelimiter, ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode 
&hdfsScanRetCode);
 
   HDFS_Scan_RetCode setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, 
int scanBufSize, int hdfsIoByteArraySizeInKB, 
             HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 
numRanges, 
-            int rangeTailIOSize);
+            int rangeTailIOSize, NABoolean sequenceFile, char recDelimiter);
 
   HDFS_Scan_RetCode trafHdfsRead(int retArray[], short arrayLen);
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/6165e6b1/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java 
b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
index d4a697f..8a14b61 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
@@ -40,6 +40,11 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+
 import java.io.EOFException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
@@ -113,6 +118,12 @@ public class HDFSClient
    private CompressionCodec codec_ = null;
    private short compressionType_;
    private int ioByteArraySizeInKB_;
+
+   private boolean sequenceFile_;
+   private byte recDelimiter_;
+   private Writable key_;
+   private Writable value_;
+   private SequenceFile.Reader reader_;
    static {
       String confFile = System.getProperty("trafodion.log4j.configFile");
       System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
@@ -159,7 +170,9 @@ public class HDFSClient
       {
          int bytesRead;
          int totalBytesRead = 0;
-         if (compressed_) {
+         if (sequenceFile_) {
+            reader_.sync(pos_);
+         } else if (compressed_) {
             bufArray_ = new byte[ioByteArraySizeInKB_ * 1024];
          } 
          else  {
@@ -185,7 +198,10 @@ public class HDFSClient
          {
             if (compressed_) {
                bytesRead = compressedFileRead(lenRemain_);
-            } else {
+            } else if (sequenceFile_) {
+               bytesRead = sequenceFileRead(lenRemain_);
+            }
+            else {
                if (buf_.hasArray())
                   bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, 
lenRemain_);
                else 
@@ -240,7 +256,48 @@ public class HDFSClient
          return totalReadLen; 
     } 
 
-    native int copyToByteBuffer(ByteBuffer buf, int bufOffset, byte[] 
bufArray, int copyLen);
+    /* Trafodion adds record delimiter '\n' while copying it
+       to buffer backing up the ByteBuffer */
+
+    int sequenceFileRead(int readLenRemain) throws IOException 
+    {
+       boolean eof = false;
+       byte[] byteArray;
+       int readLen;
+       int totalReadLen = 0;
+       long tempPos;
+       int lenRemain = readLenRemain;
+
+       while (!eof && lenRemain > 0) {
+          tempPos = reader_.getPosition();
+          try {
+            eof = reader_.next(key_, value_);
+          }
+          catch (java.io.EOFException e)
+          {
+              eof = true;
+              break;
+          }
+          byteArray = ((Text)value_).getBytes();
+          readLen = ((Text)value_).getLength();
+          if (readLen <= lenRemain) {
+                            
+              buf_.put(byteArray, 0, readLen);
+              buf_.put(recDelimiter_);
+              lenRemain_ -= (readLen+1);
+              totalReadLen += (readLen+1);
+          } else {
+              // Reset the position because the row can't be copied to buffer
+              reader_.sync(tempPos); 
+              break;       
+          }   
+       }
+       if (totalReadLen == 0)
+          totalReadLen = -1;  
+       return totalReadLen;
+    } 
+
+   native int copyToByteBuffer(ByteBuffer buf, int bufOffset, byte[] bufArray, 
int copyLen);
        
    public HDFSClient() 
    {
@@ -253,7 +310,7 @@ public class HDFSClient
    // If the range has a length more than the buffer length, the range is 
chunked
    // in HdfsScan
    public HDFSClient(int bufNo, int ioByteArraySizeInKB, int rangeNo, String 
filename, ByteBuffer buffer, long position, 
-                int length, short compressionType, CompressionInputStream 
inStream) throws IOException
+                int length, short compressionType, boolean sequenceFile, byte 
recDelimiter, CompressionInputStream inStream) throws IOException
    {
       bufNo_ = bufNo; 
       rangeNo_ = rangeNo;
@@ -263,18 +320,24 @@ public class HDFSClient
       fs_ = FileSystem.get(filepath_.toUri(),config_);
       compressionType_ = compressionType;
       inStream_ = inStream;
-      codec_ = codecFactory_.getCodec(filepath_);
-      if (codec_ != null) {
-        compressed_ = true;
-        if (inStream_ == null)
-           inStream_ = codec_.createInputStream(fs_.open(filepath_));
-      }
+      sequenceFile_ = sequenceFile;
+      recDelimiter_ = recDelimiter;
+      if (sequenceFile_)
+         fsdis_ = fs_.open(filepath_);
       else {
-        if ((compressionType_ != UNCOMPRESSED) && (compressionType_ != 
UNKNOWN_COMPRESSION))
-           throw new IOException(COMPRESSION_TYPE[compressionType_] + " 
compression codec is not configured in Hadoop");
-        if (filename_.endsWith(".lzo"))
-           throw new IOException(COMPRESSION_TYPE[LZOP] + " compression codec 
is not configured in Hadoop");
-        fsdis_ = fs_.open(filepath_);
+         codec_ = codecFactory_.getCodec(filepath_);
+         if (codec_ != null) {
+            compressed_ = true;
+            if (inStream_ == null)
+               inStream_ = codec_.createInputStream(fs_.open(filepath_));
+         }
+         else {
+           if ((compressionType_ != UNCOMPRESSED) && (compressionType_ != 
UNKNOWN_COMPRESSION))
+              throw new IOException(COMPRESSION_TYPE[compressionType_] + " 
compression codec is not configured in Hadoop");
+           if (filename_.endsWith(".lzo"))
+              throw new IOException(COMPRESSION_TYPE[LZOP] + " compression 
codec is not configured in Hadoop");
+           fsdis_ = fs_.open(filepath_);
+         }
       }
       blockSize_ = (int)fs_.getDefaultBlockSize(filepath_);
       buf_  = buffer;
@@ -289,10 +352,33 @@ public class HDFSClient
       }
       lenRemain_ = (len_ > bufLen_) ? bufLen_ : len_;
       if (lenRemain_ != 0) {
+         if (sequenceFile_) 
+            initSequenceFileRead();
          future_ = executorService_.submit(new HDFSRead());
       }
    }
 
+   /* Trafodion support Sequence file with keys written via ByteWritble or 
Text class
+      and value written via Text class. However, the key is completely ignored
+      while reading the rows. The columns in the value is delimited by column 
delimiter 001(octal).
+   */ 
+
+   public void initSequenceFileRead() throws IOException
+   {
+      SequenceFile.Reader.Option seqPos = SequenceFile.Reader.start(pos_);
+      SequenceFile.Reader.Option seqLen = 
SequenceFile.Reader.length(lenRemain_);
+      SequenceFile.Reader.Option seqInputStream = 
SequenceFile.Reader.stream(fsdis_);
+      reader_ = new SequenceFile.Reader(config_, seqPos, seqLen, 
seqInputStream);
+      String keyClass = reader_.getKeyClassName(); 
+      String valueClass = reader_.getValueClassName();
+      if (! valueClass.equals("org.apache.hadoop.io.Text")) 
+         throw new IOException("Sequence File with the value class of type " + 
valueClass +  " is not supported");
+      if (!(keyClass.equals("org.apache.hadoop.io.Text") || 
keyClass.equals("org.apache.hadoop.io.BytesWritable")))
+         throw new IOException("Sequence File with the key class of type " + 
keyClass + " is not supported");
+      key_ = (Writable) ReflectionUtils.newInstance(reader_.getKeyClass(), 
config_);
+      value_ = (Writable) ReflectionUtils.newInstance(reader_.getValueClass(), 
config_);
+   }
+
   //  This method waits for the read to complete. Read can complete due to one 
of the following
   //  a) buffer is full
   //  b) EOF is reached

http://git-wip-us.apache.org/repos/asf/trafodion/blob/6165e6b1/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java 
b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
index 48d5768..3ef1be7 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
@@ -75,7 +75,8 @@ public class HdfsScan
    private boolean scanCompleted_;
    private CompressionInputStream currInStream_;
    private int ioByteArraySizeInKB_;
- 
+   private boolean sequenceFile_; 
+   private byte recDelimiter_;
  
    // Structure to hold the Scan ranges for this HdfsScan instance
    //
@@ -109,7 +110,7 @@ public class HdfsScan
    }
 
    public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, int 
ioByteArraySizeInKB, String filename[], long pos[], 
-            long len[], int rangeNum[], short compressionType[]) throws 
IOException
+            long len[], int rangeNum[], short compressionType[], boolean 
sequenceFile, byte recDelimiter) throws IOException
    {
       // Two buffers to hold the data read
       buf_ = new ByteBuffer[2];
@@ -129,6 +130,9 @@ public class HdfsScan
       for (int i = 0; i < filename.length; i++) {
          hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], len[i], 
rangeNum[i], compressionType[i]);
       }
+      sequenceFile_ = sequenceFile;
+      recDelimiter_ = recDelimiter;
+      scanCompleted_ = false;
       if (hdfsScanRanges_.length > 0) {
          currRange_ = 0;
          currRangePos_ = hdfsScanRanges_[currRange_].pos_;
@@ -136,7 +140,6 @@ public class HdfsScan
          currInStream_ = null;
          scheduleHdfsScanRange(0, 0);
       }
-      scanCompleted_ = false;
    }
 
    public void scheduleHdfsScanRange(int bufNo, int bytesCompleted) throws 
IOException
@@ -168,7 +171,7 @@ public class HdfsScan
          hdfsClient_[bufNo] = new HDFSClient(bufNo, ioByteArraySizeInKB_, 
hdfsScanRanges_[currRange_].tdbRangeNum_, 
                        hdfsScanRanges_[currRange_].filename_, 
                         buf_[bufNo], currRangePos_, readLength, 
-                        hdfsScanRanges_[currRange_].compressionType_, 
currInStream_);
+                        hdfsScanRanges_[currRange_].compressionType_, 
sequenceFile_, recDelimiter_, currInStream_);
                         
       }
    } 
@@ -287,6 +290,7 @@ public class HdfsScan
       Table table = hiveMeta.getTable(tableName);
       StorageDescriptor sd = table.getSd();
       String location = sd.getLocation();
+      String inputFormat = sd.getInputFormat();
       URI uri = new URI(location);
       Path path = new Path(uri);
       FileSystem fs = FileSystem.get(config);       
@@ -305,7 +309,7 @@ public class HdfsScan
          fileName[i] = filePath.toString();
          System.out.println (" fileName " + fileName[i] + " Length " + 
fileLen); 
          long splitPos = 0;
-         for (int j = 0 ; j < split ; j++)
+         for (int j = 0 ; j < split; j++)
          { 
             fileName[i] = filePath.toString();
             pos[i] = splitPos + (splitLen * j);
@@ -315,12 +319,18 @@ public class HdfsScan
                len[i] = fileLen - (splitLen *(j));
             compress[i] = 1; // Uncompressed
             System.out.println ("Range " + i + " Pos " + pos[i] + " Length " + 
len[i]); 
-            i++;
+            if (j != (split-1))
+               i++;
          }
       }
       long time1 = System.currentTimeMillis();
+      System.out.println("Input format " + inputFormat); 
+      boolean sequenceFile = false;
+      if (inputFormat.indexOf(new String("SequenceFileInputFormat")) != -1) 
+          sequenceFile = true;
       HdfsScan hdfsScan = new HdfsScan();
-      hdfsScan.setScanRanges(buf1, buf2, (short)512, fileName, pos, len, 
range, compress);
+      byte b = '\n';
+      hdfsScan.setScanRanges(buf1, buf2, (short)512, fileName, pos, len, 
range, compress, sequenceFile, b);
       int[] retArray;
       int bytesCompleted;
       ByteBuffer buf;

Reply via email to