[HOTFIX] Fixed S3 metrics issue. Problem: When data read from s3 it shows the data read as more than the size of carbon data total size. Reason: It happens because carbondata uses dataInputStream.skip but in s3 interface it cannot handle properly it reads in a loop and reads more data than required. Solution: Use FSDataInputStream.seek instead of skip to fix this issue.
This closes #2789 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7d1fcb30 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7d1fcb30 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7d1fcb30 Branch: refs/heads/branch-1.5 Commit: 7d1fcb3092a1e9da6c49f17c63c6217892e9e531 Parents: 2081bc8 Author: ravipesala <[email protected]> Authored: Fri Sep 28 18:29:08 2018 +0530 Committer: kumarvishal09 <[email protected]> Committed: Wed Oct 3 16:08:49 2018 +0530 ---------------------------------------------------------------------- .../datastore/filesystem/AbstractDFSCarbonFile.java | 7 +++++-- .../apache/carbondata/core/reader/ThriftReader.java | 16 ++++++---------- 2 files changed, 11 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/7d1fcb30/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java index b1e476b..c764430 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java @@ -327,8 +327,11 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile { CompressionCodec codec = new CompressionCodecFactory(hadoopConf).getCodecByName(codecName); inputStream = codec.createInputStream(inputStream); } - - return new DataInputStream(new BufferedInputStream(inputStream)); + if (bufferSize <= 0 && inputStream instanceof FSDataInputStream) { + return (DataInputStream) inputStream; + } else { + return new DataInputStream(new BufferedInputStream(inputStream)); + } } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/7d1fcb30/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java index 48d8345..f5ecda6 100644 --- a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java +++ b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java @@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.thrift.protocol.TCompactProtocol; @@ -36,10 +37,6 @@ import org.apache.thrift.transport.TIOStreamTransport; */ public class ThriftReader { /** - * buffer size - */ - private static final int bufferSize = 2048; - /** * File containing the objects. */ private String fileName; @@ -101,7 +98,7 @@ public class ThriftReader { public void open() throws IOException { Configuration conf = configuration != null ? configuration : FileFactory.getConfiguration(); FileFactory.FileType fileType = FileFactory.getFileType(fileName); - dataInputStream = FileFactory.getDataInputStream(fileName, fileType, bufferSize, conf); + dataInputStream = FileFactory.getDataInputStream(fileName, fileType, conf); binaryIn = new TCompactProtocol(new TIOStreamTransport(dataInputStream)); } @@ -109,7 +106,9 @@ public class ThriftReader { * This method will set the position of stream from where data has to be read */ public void setReadOffset(long bytesToSkip) throws IOException { - if (dataInputStream.skip(bytesToSkip) != bytesToSkip) { + if (dataInputStream instanceof FSDataInputStream) { + ((FSDataInputStream)dataInputStream).seek(bytesToSkip); + } else if (dataInputStream.skip(bytesToSkip) != bytesToSkip) { throw new IOException("It doesn't set the offset properly"); } } @@ -118,10 +117,7 @@ public class ThriftReader { * Checks if another objects is available by attempting to read another byte from the stream. */ public boolean hasNext() throws IOException { - dataInputStream.mark(1); - int val = dataInputStream.read(); - dataInputStream.reset(); - return val != -1; + return dataInputStream.available() > 0; } /**
