Repository: trafodion Updated Branches: refs/heads/master 5e8bfc70d -> 7fba1c662
[TRAFODION-3171] Refactor Hive sequence file reading to use the new implementation Ensured that split of non-compressed sequence file works. In case of compressed sequence filee, an exception is thrown when the file is split. Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/1c8c81d7 Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/1c8c81d7 Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/1c8c81d7 Branch: refs/heads/master Commit: 1c8c81d708af0520e61b8c6a20ff69135a6b8f5c Parents: 39d7110 Author: selvaganesang <[email protected]> Authored: Mon Aug 13 23:24:47 2018 +0000 Committer: selvaganesang <[email protected]> Committed: Mon Aug 13 23:24:47 2018 +0000 ---------------------------------------------------------------------- .../main/java/org/trafodion/sql/HDFSClient.java | 45 ++++++++++++++------ 1 file changed, 32 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/1c8c81d7/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java index 1995851..6b7f051 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java +++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java @@ -124,6 +124,8 @@ public class HDFSClient private Writable key_; private Writable value_; private SequenceFile.Reader reader_; + private SequenceFile.CompressionType seqCompressionType_; + static { String confFile = System.getProperty("trafodion.log4j.configFile"); System.setProperty("trafodion.root", System.getenv("TRAF_HOME")); @@ -171,7 +173,8 @@ public class HDFSClient int bytesRead; int totalBytesRead = 0; if (sequenceFile_) { - reader_.sync(pos_); + // do nothing + ; } else if (compressed_) { bufArray_ = new byte[ioByteArraySizeInKB_ * 1024]; } @@ -196,11 +199,10 @@ public class HDFSClient } do { - if (compressed_) { - bytesRead = compressedFileRead(lenRemain_); - } else if (sequenceFile_) { + if (sequenceFile_) bytesRead = sequenceFileRead(lenRemain_); - } + else if (compressed_) + bytesRead = compressedFileRead(lenRemain_); else { if (buf_.hasArray()) bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, lenRemain_); @@ -269,8 +271,8 @@ public class HDFSClient int lenRemain = readLenRemain; while (!eof && lenRemain > 0) { - tempPos = reader_.getPosition(); try { + tempPos = reader_.getPosition(); eof = reader_.next(key_, value_); } catch (java.io.EOFException e) @@ -288,7 +290,11 @@ public class HDFSClient totalReadLen += (readLen+1); } else { // Reset the position because the row can't be copied to buffer - reader_.sync(tempPos); + try { + reader_.sync(tempPos); + } + catch (java.io.EOFException e1) + {} break; } } @@ -323,8 +329,10 @@ public class HDFSClient inStream_ = inStream; sequenceFile_ = sequenceFile; recDelimiter_ = recDelimiter; - if (sequenceFile_) - fsdis_ = fs_.open(filepath_); + if (sequenceFile_) { + fsdis_ = null; + inStream_ = null; + } else { codec_ = codecFactory_.getCodec(filepath_); if (codec_ != null) { @@ -366,10 +374,21 @@ public class HDFSClient public void initSequenceFileRead() throws IOException, EOFException { - SequenceFile.Reader.Option seqPos = SequenceFile.Reader.start(pos_); SequenceFile.Reader.Option seqLen = SequenceFile.Reader.length(lenRemain_); - SequenceFile.Reader.Option seqInputStream = SequenceFile.Reader.stream(fsdis_); - reader_ = new SequenceFile.Reader(config_, seqPos, seqLen, seqInputStream); + SequenceFile.Reader.Option seqFileName = SequenceFile.Reader.file(filepath_); + reader_ = new SequenceFile.Reader(config_, seqLen, seqFileName); + seqCompressionType_ = reader_.getCompressionType(); + if (seqCompressionType_ == SequenceFile.CompressionType.NONE) + compressed_ = false; + else + compressed_ = true; + if (compressed_ && pos_ != 0) + throw new IOException("Spliting of compressed sequence file is not supported"); + try { + reader_.sync(pos_); + } catch (EOFException e) + {} + String keyClass = reader_.getKeyClassName(); String valueClass = reader_.getValueClassName(); if (! valueClass.equals("org.apache.hadoop.io.Text")) @@ -390,7 +409,7 @@ public class HDFSClient int bytesRead; retObject = (Integer)future_.get(); bytesRead = retObject.intValue(); - if (! compressed_) + if (! compressed_ && fsdis_ != null) fsdis_.close(); fsdis_ = null; return bytesRead;
