Repository: kylin
Updated Branches:
  refs/heads/2.x-staging db95d72ca -> 54071e311


KYLIN-1245 bug fix when reading stats seq file


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/54071e31
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/54071e31
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/54071e31

Branch: refs/heads/2.x-staging
Commit: 54071e311d1bfcbc96390631a2d98d3ce70aaef6
Parents: db95d72
Author: Li, Yang <yang...@ebay.com>
Authored: Wed Dec 23 15:13:57 2015 +0800
Committer: Li, Yang <yang...@ebay.com>
Committed: Wed Dec 23 15:13:57 2015 +0800

----------------------------------------------------------------------
 .../kylin/engine/mr/common/CubeStatsReader.java | 29 +++++++++++++++-----
 1 file changed, 22 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/54071e31/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index fc27a81..bbc724a 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.engine.mr.common;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintWriter;
@@ -29,7 +31,7 @@ import java.util.Map;
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.LongWritable;
@@ -76,14 +78,14 @@ public class CubeStatsReader {
     public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) 
throws IOException {
         ResourceStore store = ResourceStore.getStore(kylinConfig);
         String statsKey = cubeSegment.getStatisticsResourcePath();
-        InputStream is = store.getResource(statsKey).inputStream;
+        File tmpSeqFile = 
writeTmpSeqFile(store.getResource(statsKey).inputStream);
         Reader reader = null;
 
         try {
             Configuration hadoopConf = HadoopUtil.getCurrentConfiguration();
 
-            Option streamInput = SequenceFile.Reader.stream(new 
FSDataInputStream(is));
-            reader = new SequenceFile.Reader(hadoopConf, streamInput);
+            Option seqInput = SequenceFile.Reader.file(new Path("file://" + 
tmpSeqFile.getAbsolutePath()));
+            reader = new SequenceFile.Reader(hadoopConf, seqInput);
 
             int percentage = 100;
             double mapperOverlapRatio = 0;
@@ -111,10 +113,23 @@ public class CubeStatsReader {
 
         } finally {
             IOUtils.closeStream(reader);
-            IOUtils.closeStream(is);
+            tmpSeqFile.delete();
         }
     }
 
+    private File writeTmpSeqFile(InputStream inputStream) throws IOException {
+        File tempFile = File.createTempFile("kylin_stats_tmp", ".seq");
+        FileOutputStream out = null;
+        try {
+            out = new FileOutputStream(tempFile);
+            org.apache.commons.io.IOUtils.copy(inputStream, out);
+        } finally {
+            IOUtils.closeStream(inputStream);
+            IOUtils.closeStream(out);
+        }
+        return tempFile;
+    }
+
     public Map<Long, Long> getCuboidRowCountMap() {
         return getCuboidRowCountMapFromSampling(cuboidRowCountMap, 
samplingPercentage);
     }
@@ -211,12 +226,12 @@ public class CubeStatsReader {
         logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each 
row size is " + bytesLength + " bytes." + " Total size is " + ret + "M.");
         return ret;
     }
-    
+
     public static void main(String[] args) throws IOException {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         CubeInstance cube = CubeManager.getInstance(config).getCube(args[0]);
         List<CubeSegment> segments = cube.getSegments(SegmentStatusEnum.READY);
-        
+
         PrintWriter out = new PrintWriter(System.out);
         for (CubeSegment seg : segments) {
             new CubeStatsReader(seg, config).print(out);

Reply via email to