Repository: trafodion
Updated Branches:
  refs/heads/master 5e8bfc70d -> 7fba1c662


[TRAFODION-3171] Refactor Hive sequence file reading to use the new 
implementation

Ensured that split of non-compressed sequence file works. In case of compressed
sequence filee, an exception is thrown when the file is split.


Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/1c8c81d7
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/1c8c81d7
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/1c8c81d7

Branch: refs/heads/master
Commit: 1c8c81d708af0520e61b8c6a20ff69135a6b8f5c
Parents: 39d7110
Author: selvaganesang <[email protected]>
Authored: Mon Aug 13 23:24:47 2018 +0000
Committer: selvaganesang <[email protected]>
Committed: Mon Aug 13 23:24:47 2018 +0000

----------------------------------------------------------------------
 .../main/java/org/trafodion/sql/HDFSClient.java | 45 ++++++++++++++------
 1 file changed, 32 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafodion/blob/1c8c81d7/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java 
b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
index 1995851..6b7f051 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
@@ -124,6 +124,8 @@ public class HDFSClient
    private Writable key_;
    private Writable value_;
    private SequenceFile.Reader reader_;
+   private SequenceFile.CompressionType seqCompressionType_;
+
    static {
       String confFile = System.getProperty("trafodion.log4j.configFile");
       System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
@@ -171,7 +173,8 @@ public class HDFSClient
          int bytesRead;
          int totalBytesRead = 0;
          if (sequenceFile_) {
-            reader_.sync(pos_);
+            // do nothing
+            ; 
          } else if (compressed_) {
             bufArray_ = new byte[ioByteArraySizeInKB_ * 1024];
          } 
@@ -196,11 +199,10 @@ public class HDFSClient
          }
          do
          {
-            if (compressed_) {
-               bytesRead = compressedFileRead(lenRemain_);
-            } else if (sequenceFile_) {
+            if (sequenceFile_) 
                bytesRead = sequenceFileRead(lenRemain_);
-            }
+            else if (compressed_) 
+               bytesRead = compressedFileRead(lenRemain_);
             else {
                if (buf_.hasArray())
                   bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, 
lenRemain_);
@@ -269,8 +271,8 @@ public class HDFSClient
        int lenRemain = readLenRemain;
 
        while (!eof && lenRemain > 0) {
-          tempPos = reader_.getPosition();
           try {
+            tempPos = reader_.getPosition();
             eof = reader_.next(key_, value_);
           }
           catch (java.io.EOFException e)
@@ -288,7 +290,11 @@ public class HDFSClient
               totalReadLen += (readLen+1);
           } else {
               // Reset the position because the row can't be copied to buffer
-              reader_.sync(tempPos); 
+              try {
+                reader_.sync(tempPos); 
+              }
+              catch (java.io.EOFException e1)
+              {}
               break;       
           }   
        }
@@ -323,8 +329,10 @@ public class HDFSClient
       inStream_ = inStream;
       sequenceFile_ = sequenceFile;
       recDelimiter_ = recDelimiter;
-      if (sequenceFile_)
-         fsdis_ = fs_.open(filepath_);
+      if (sequenceFile_) {
+         fsdis_ = null;
+         inStream_ = null;
+      } 
       else {
          codec_ = codecFactory_.getCodec(filepath_);
          if (codec_ != null) {
@@ -366,10 +374,21 @@ public class HDFSClient
 
    public void initSequenceFileRead() throws IOException, EOFException
    {
-      SequenceFile.Reader.Option seqPos = SequenceFile.Reader.start(pos_);
       SequenceFile.Reader.Option seqLen = 
SequenceFile.Reader.length(lenRemain_);
-      SequenceFile.Reader.Option seqInputStream = 
SequenceFile.Reader.stream(fsdis_);
-      reader_ = new SequenceFile.Reader(config_, seqPos, seqLen, 
seqInputStream);
+      SequenceFile.Reader.Option seqFileName = 
SequenceFile.Reader.file(filepath_);
+      reader_ = new SequenceFile.Reader(config_, seqLen, seqFileName);
+      seqCompressionType_ = reader_.getCompressionType();
+      if (seqCompressionType_ == SequenceFile.CompressionType.NONE)
+         compressed_ = false;
+      else
+         compressed_ = true;
+      if (compressed_ && pos_ != 0)
+         throw new IOException("Spliting of compressed sequence file is not 
supported");
+      try {
+        reader_.sync(pos_);
+      } catch (EOFException e)
+      {}
+      
       String keyClass = reader_.getKeyClassName(); 
       String valueClass = reader_.getValueClassName();
       if (! valueClass.equals("org.apache.hadoop.io.Text")) 
@@ -390,7 +409,7 @@ public class HDFSClient
       int bytesRead;
       retObject = (Integer)future_.get();
       bytesRead = retObject.intValue();
-      if (! compressed_)
+      if (! compressed_ && fsdis_ != null)
          fsdis_.close();
       fsdis_ = null;
       return bytesRead;

Reply via email to