hit-lacus commented on a change in pull request #1065: KYLIN-4349 Close 
InputStream in RowRecordReader.initReaders()
URL: https://github.com/apache/kylin/pull/1065#discussion_r376306563
 
 

 ##########
 File path: 
engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java
 ##########
 @@ -82,51 +82,72 @@ public RowRecordReader(CubeDesc cubeDesc, Path path, 
FileSystem fileSystem) thro
     }
 
     public void initReaders() throws IOException {
-        FSDataInputStream in = fs.open(metaFilePath);
-        FragmentMetaInfo fragmentMetaInfo = JsonUtil.readValue(in, 
FragmentMetaInfo.class);
-        CuboidMetaInfo basicCuboidMetaInfo = 
fragmentMetaInfo.getBasicCuboidMetaInfo();
-        FSDataInputStream dictInputStream = fs.open(dataFilePath);
+        FSDataInputStream in = null;
+        try {
+            in = fs.open(metaFilePath);
+            FragmentMetaInfo fragmentMetaInfo = JsonUtil.readValue(in, 
FragmentMetaInfo.class);
+            CuboidMetaInfo basicCuboidMetaInfo = 
fragmentMetaInfo.getBasicCuboidMetaInfo();
+            FSDataInputStream dictInputStream = fs.open(dataFilePath);
 
-        List<DimensionMetaInfo> allDimensions = 
basicCuboidMetaInfo.getDimensionsInfo();
-        Map<String, DimensionEncoding> dimensionEncodingMap = 
getDimensionEncodings(fragmentMetaInfo, allDimensions,
-                dictInputStream);
-        dimensionColumnReaders = Lists.newArrayList();
-        dimensionColumnReaderItrs = Lists.newArrayList();
-        dimensionEncodings = Lists.newArrayList();
-        for (DimensionMetaInfo dimensionMetaInfo : allDimensions) {
-            FSDataInputStream dimInputStream = fs.open(dataFilePath);
-            String dimName = dimensionMetaInfo.getName();
-            DimensionEncoding dimEncoding = dimensionEncodingMap.get(dimName);
-            ColumnarStoreDimDesc dimDesc = new 
ColumnarStoreDimDesc(dimEncoding.getLengthOfEncoding(),
-                    dimensionMetaInfo.getCompressionType());
-            ColumnDataReader dimDataReader = 
dimDesc.getDimReaderFromFSInput(dimInputStream,
-                    dimensionMetaInfo.getStartOffset(), 
dimensionMetaInfo.getDataLength(),
-                    (int) basicCuboidMetaInfo.getNumberOfRows());
-            dimensionColumnReaders.add(dimDataReader);
-            dimensionColumnReaderItrs.add(dimDataReader.iterator());
-            dimensionEncodings.add(dimEncoding);
-        }
-        rowDimensionValues = new String[dimensionColumnReaders.size()];
+            List<DimensionMetaInfo> allDimensions = 
basicCuboidMetaInfo.getDimensionsInfo();
+            Map<String, DimensionEncoding> dimensionEncodingMap = 
getDimensionEncodings(fragmentMetaInfo, allDimensions,
+                    dictInputStream);
+            dimensionColumnReaders = Lists.newArrayList();
+            dimensionColumnReaderItrs = Lists.newArrayList();
+            dimensionEncodings = Lists.newArrayList();
+            for (DimensionMetaInfo dimensionMetaInfo : allDimensions) {
+                FSDataInputStream dimInputStream = null;
+                try {
+                    dimInputStream = fs.open(dataFilePath);
+                    String dimName = dimensionMetaInfo.getName();
+                    DimensionEncoding dimEncoding = 
dimensionEncodingMap.get(dimName);
+                    ColumnarStoreDimDesc dimDesc = new 
ColumnarStoreDimDesc(dimEncoding.getLengthOfEncoding(),
+                            dimensionMetaInfo.getCompressionType());
+                    ColumnDataReader dimDataReader = 
dimDesc.getDimReaderFromFSInput(dimInputStream,
+                            dimensionMetaInfo.getStartOffset(), 
dimensionMetaInfo.getDataLength(),
+                            (int) basicCuboidMetaInfo.getNumberOfRows());
+                    dimensionColumnReaders.add(dimDataReader);
+                    dimensionColumnReaderItrs.add(dimDataReader.iterator());
+                    dimensionEncodings.add(dimEncoding);
+                } finally {
+                    if (null != dimInputStream) {
+                        dimInputStream.close();
+                    }
+                }
+            }
+            rowDimensionValues = new String[dimensionColumnReaders.size()];
 
-        metricsColumnReaders = Lists.newArrayList();
-        metricsColumnReaderItrs = Lists.newArrayList();
-        metricsDataTransformers = Lists.newArrayList();
-        for (MetricMetaInfo metricMetaInfo : 
basicCuboidMetaInfo.getMetricsInfo()) {
-            FSDataInputStream metricsInputStream = fs.open(dataFilePath);
-            MeasureDesc measure = findMeasure(metricMetaInfo.getName());
-            DataType metricsDataType = 
measure.getFunction().getReturnDataType();
-            ColumnarMetricsEncoding metricsEncoding = 
ColumnarMetricsEncodingFactory.create(metricsDataType);
-            ColumnarStoreMetricsDesc metricsDesc = new 
ColumnarStoreMetricsDesc(metricsEncoding,
-                    metricMetaInfo.getCompressionType());
-            ColumnDataReader metricsDataReader = 
metricsDesc.getMetricsReaderFromFSInput(metricsInputStream,
-                    metricMetaInfo.getStartOffset(), 
metricMetaInfo.getMetricLength(),
-                    (int) basicCuboidMetaInfo.getNumberOfRows());
-            metricsColumnReaders.add(metricsDataReader);
-            metricsColumnReaderItrs.add(metricsDataReader.iterator());
-            metricsDataTransformers.add(new 
MetricsDataTransformer(metricsEncoding.asDataTypeSerializer(),
-                    DataTypeSerializer.create(metricsDataType)));
+            metricsColumnReaders = Lists.newArrayList();
+            metricsColumnReaderItrs = Lists.newArrayList();
+            metricsDataTransformers = Lists.newArrayList();
+            for (MetricMetaInfo metricMetaInfo : 
basicCuboidMetaInfo.getMetricsInfo()) {
+                FSDataInputStream metricsInputStream = null;
+                try {
+                    metricsInputStream = fs.open(dataFilePath);
+                    MeasureDesc measure = 
findMeasure(metricMetaInfo.getName());
+                    DataType metricsDataType = 
measure.getFunction().getReturnDataType();
+                    ColumnarMetricsEncoding metricsEncoding = 
ColumnarMetricsEncodingFactory.create(metricsDataType);
+                    ColumnarStoreMetricsDesc metricsDesc = new 
ColumnarStoreMetricsDesc(metricsEncoding,
+                            metricMetaInfo.getCompressionType());
+                    ColumnDataReader metricsDataReader = 
metricsDesc.getMetricsReaderFromFSInput(metricsInputStream,
+                            metricMetaInfo.getStartOffset(), 
metricMetaInfo.getMetricLength(),
+                            (int) basicCuboidMetaInfo.getNumberOfRows());
+                    metricsColumnReaders.add(metricsDataReader);
+                    metricsColumnReaderItrs.add(metricsDataReader.iterator());
+                    metricsDataTransformers.add(new 
MetricsDataTransformer(metricsEncoding.asDataTypeSerializer(),
+                            DataTypeSerializer.create(metricsDataType)));
+                } finally {
+                    if (null != metricsInputStream) {
 
 Review comment:
   `metricsInputStream ` should not be closed for the same reason.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to