hit-lacus commented on a change in pull request #1065: KYLIN-4349 Close InputStream in RowRecordReader.initReaders() URL: https://github.com/apache/kylin/pull/1065#discussion_r376306563
########## File path: engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java ########## @@ -82,51 +82,72 @@ public RowRecordReader(CubeDesc cubeDesc, Path path, FileSystem fileSystem) thro } public void initReaders() throws IOException { - FSDataInputStream in = fs.open(metaFilePath); - FragmentMetaInfo fragmentMetaInfo = JsonUtil.readValue(in, FragmentMetaInfo.class); - CuboidMetaInfo basicCuboidMetaInfo = fragmentMetaInfo.getBasicCuboidMetaInfo(); - FSDataInputStream dictInputStream = fs.open(dataFilePath); + FSDataInputStream in = null; + try { + in = fs.open(metaFilePath); + FragmentMetaInfo fragmentMetaInfo = JsonUtil.readValue(in, FragmentMetaInfo.class); + CuboidMetaInfo basicCuboidMetaInfo = fragmentMetaInfo.getBasicCuboidMetaInfo(); + FSDataInputStream dictInputStream = fs.open(dataFilePath); - List<DimensionMetaInfo> allDimensions = basicCuboidMetaInfo.getDimensionsInfo(); - Map<String, DimensionEncoding> dimensionEncodingMap = getDimensionEncodings(fragmentMetaInfo, allDimensions, - dictInputStream); - dimensionColumnReaders = Lists.newArrayList(); - dimensionColumnReaderItrs = Lists.newArrayList(); - dimensionEncodings = Lists.newArrayList(); - for (DimensionMetaInfo dimensionMetaInfo : allDimensions) { - FSDataInputStream dimInputStream = fs.open(dataFilePath); - String dimName = dimensionMetaInfo.getName(); - DimensionEncoding dimEncoding = dimensionEncodingMap.get(dimName); - ColumnarStoreDimDesc dimDesc = new ColumnarStoreDimDesc(dimEncoding.getLengthOfEncoding(), - dimensionMetaInfo.getCompressionType()); - ColumnDataReader dimDataReader = dimDesc.getDimReaderFromFSInput(dimInputStream, - dimensionMetaInfo.getStartOffset(), dimensionMetaInfo.getDataLength(), - (int) basicCuboidMetaInfo.getNumberOfRows()); - dimensionColumnReaders.add(dimDataReader); - dimensionColumnReaderItrs.add(dimDataReader.iterator()); - dimensionEncodings.add(dimEncoding); - } - rowDimensionValues = new String[dimensionColumnReaders.size()]; + List<DimensionMetaInfo> allDimensions = basicCuboidMetaInfo.getDimensionsInfo(); + Map<String, DimensionEncoding> dimensionEncodingMap = getDimensionEncodings(fragmentMetaInfo, allDimensions, + dictInputStream); + dimensionColumnReaders = Lists.newArrayList(); + dimensionColumnReaderItrs = Lists.newArrayList(); + dimensionEncodings = Lists.newArrayList(); + for (DimensionMetaInfo dimensionMetaInfo : allDimensions) { + FSDataInputStream dimInputStream = null; + try { + dimInputStream = fs.open(dataFilePath); + String dimName = dimensionMetaInfo.getName(); + DimensionEncoding dimEncoding = dimensionEncodingMap.get(dimName); + ColumnarStoreDimDesc dimDesc = new ColumnarStoreDimDesc(dimEncoding.getLengthOfEncoding(), + dimensionMetaInfo.getCompressionType()); + ColumnDataReader dimDataReader = dimDesc.getDimReaderFromFSInput(dimInputStream, + dimensionMetaInfo.getStartOffset(), dimensionMetaInfo.getDataLength(), + (int) basicCuboidMetaInfo.getNumberOfRows()); + dimensionColumnReaders.add(dimDataReader); + dimensionColumnReaderItrs.add(dimDataReader.iterator()); + dimensionEncodings.add(dimEncoding); + } finally { + if (null != dimInputStream) { + dimInputStream.close(); + } + } + } + rowDimensionValues = new String[dimensionColumnReaders.size()]; - metricsColumnReaders = Lists.newArrayList(); - metricsColumnReaderItrs = Lists.newArrayList(); - metricsDataTransformers = Lists.newArrayList(); - for (MetricMetaInfo metricMetaInfo : basicCuboidMetaInfo.getMetricsInfo()) { - FSDataInputStream metricsInputStream = fs.open(dataFilePath); - MeasureDesc measure = findMeasure(metricMetaInfo.getName()); - DataType metricsDataType = measure.getFunction().getReturnDataType(); - ColumnarMetricsEncoding metricsEncoding = ColumnarMetricsEncodingFactory.create(metricsDataType); - ColumnarStoreMetricsDesc metricsDesc = new ColumnarStoreMetricsDesc(metricsEncoding, - metricMetaInfo.getCompressionType()); - ColumnDataReader metricsDataReader = metricsDesc.getMetricsReaderFromFSInput(metricsInputStream, - metricMetaInfo.getStartOffset(), metricMetaInfo.getMetricLength(), - (int) basicCuboidMetaInfo.getNumberOfRows()); - metricsColumnReaders.add(metricsDataReader); - metricsColumnReaderItrs.add(metricsDataReader.iterator()); - metricsDataTransformers.add(new MetricsDataTransformer(metricsEncoding.asDataTypeSerializer(), - DataTypeSerializer.create(metricsDataType))); + metricsColumnReaders = Lists.newArrayList(); + metricsColumnReaderItrs = Lists.newArrayList(); + metricsDataTransformers = Lists.newArrayList(); + for (MetricMetaInfo metricMetaInfo : basicCuboidMetaInfo.getMetricsInfo()) { + FSDataInputStream metricsInputStream = null; + try { + metricsInputStream = fs.open(dataFilePath); + MeasureDesc measure = findMeasure(metricMetaInfo.getName()); + DataType metricsDataType = measure.getFunction().getReturnDataType(); + ColumnarMetricsEncoding metricsEncoding = ColumnarMetricsEncodingFactory.create(metricsDataType); + ColumnarStoreMetricsDesc metricsDesc = new ColumnarStoreMetricsDesc(metricsEncoding, + metricMetaInfo.getCompressionType()); + ColumnDataReader metricsDataReader = metricsDesc.getMetricsReaderFromFSInput(metricsInputStream, + metricMetaInfo.getStartOffset(), metricMetaInfo.getMetricLength(), + (int) basicCuboidMetaInfo.getNumberOfRows()); + metricsColumnReaders.add(metricsDataReader); + metricsColumnReaderItrs.add(metricsDataReader.iterator()); + metricsDataTransformers.add(new MetricsDataTransformer(metricsEncoding.asDataTypeSerializer(), + DataTypeSerializer.create(metricsDataType))); + } finally { + if (null != metricsInputStream) { Review comment: `metricsInputStream ` should not be closed for the same reason. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services