[ https://issues.apache.org/jira/browse/PARQUET-2154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yang Jie updated PARQUET-2154: ------------------------------ Description: {code:java} public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException { this.converter = new ParquetMetadataConverter(options); this.file = file; this.f = file.newStream(); this.options = options; try { this.footer = readFooter(file, options, f, converter); } catch (Exception e) { // In case that reading footer throws an exception in the constructor, the new stream // should be closed. Otherwise, there's no way to close this outside. f.close(); throw e; } this.fileMetaData = footer.getFileMetaData(); this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called before filterRowGroups! if (null != fileDecryptor && fileDecryptor.plaintextFile()) { this.fileDecryptor = null; // Plaintext file. No need in decryptor } this.blocks = filterRowGroups(footer.getBlocks()); this.blockIndexStores = listWithNulls(this.blocks.size()); this.blockRowRanges = listWithNulls(this.blocks.size()); for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { paths.put(ColumnPath.get(col.getPath()), col); } this.crc = options.usePageChecksumVerification() ? new CRC32() : null; } {code} During the construction of ParquetFileReader, if `filterRowGroups` method throws an exception, it will cause resource leak because when `filterRowGroups(footer.getBlocks())` throw an Exception, the open stream `{{{}this.f = file.newStream()`{}}} looks unable to be closed. was: Parquet only supports predicate push-down for non-repeated primitive types now(PARQUET-34), so if try to push-down a filter for a repeated primitive type, the constructor of ParquetFileReader will thrown `java.lang.IllegalArgumentException` as follows: {code:java} 21:57:24.190 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column f is repeated. at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:195) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:164) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:92) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59) at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:195) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64) at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95) at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45) at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149) at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72) at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:871) at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:790) at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:100) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:173) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$1(ParquetFileFormat.scala:340) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:211) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:272) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:118) at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:580) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1908) at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1268) at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1268) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2267) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1481) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) {code} and the above issue will cause resource leak. {code:java} public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException { this.converter = new ParquetMetadataConverter(options); this.file = file; this.f = file.newStream(); this.options = options; try { this.footer = readFooter(file, options, f, converter); } catch (Exception e) { // In case that reading footer throws an exception in the constructor, the new stream // should be closed. Otherwise, there's no way to close this outside. f.close(); throw e; } this.fileMetaData = footer.getFileMetaData(); this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called before filterRowGroups! if (null != fileDecryptor && fileDecryptor.plaintextFile()) { this.fileDecryptor = null; // Plaintext file. No need in decryptor } this.blocks = filterRowGroups(footer.getBlocks()); this.blockIndexStores = listWithNulls(this.blocks.size()); this.blockRowRanges = listWithNulls(this.blocks.size()); for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { paths.put(ColumnPath.get(col.getPath()), col); } this.crc = options.usePageChecksumVerification() ? new CRC32() : null; } {code} In the above code, when `filterRowGroups(footer.getBlocks())` throw an Exception, the open stream `{{{}this.f = file.newStream()`{}}} looks unable to be closed. > ParquetFileReader should close its input stream when `filterRowGroups` throw > Exception in constructor > ----------------------------------------------------------------------------------------------------- > > Key: PARQUET-2154 > URL: https://issues.apache.org/jira/browse/PARQUET-2154 > Project: Parquet > Issue Type: Bug > Reporter: Yang Jie > Priority: Major > > > {code:java} > public ParquetFileReader(InputFile file, ParquetReadOptions options) throws > IOException { > this.converter = new ParquetMetadataConverter(options); > this.file = file; > this.f = file.newStream(); > this.options = options; > try { > this.footer = readFooter(file, options, f, converter); > } catch (Exception e) { > // In case that reading footer throws an exception in the constructor, > the new stream > // should be closed. Otherwise, there's no way to close this outside. > f.close(); > throw e; > } > this.fileMetaData = footer.getFileMetaData(); > this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called > before filterRowGroups! > if (null != fileDecryptor && fileDecryptor.plaintextFile()) { > this.fileDecryptor = null; // Plaintext file. No need in decryptor > } > this.blocks = filterRowGroups(footer.getBlocks()); > this.blockIndexStores = listWithNulls(this.blocks.size()); > this.blockRowRanges = listWithNulls(this.blocks.size()); > for (ColumnDescriptor col : > footer.getFileMetaData().getSchema().getColumns()) { > paths.put(ColumnPath.get(col.getPath()), col); > } > this.crc = options.usePageChecksumVerification() ? new CRC32() : null; > } {code} > During the construction of ParquetFileReader, if `filterRowGroups` method > throws an exception, it will cause resource leak because when > `filterRowGroups(footer.getBlocks())` throw an Exception, the open stream > `{{{}this.f = file.newStream()`{}}} looks unable to be closed. > -- This message was sent by Atlassian Jira (v8.20.7#820007)