Yang Jie created PARQUET-2154: --------------------------------- Summary: ParquetFileReader should close its input stream when `filterRowGroups` throw Exception in constructor Key: PARQUET-2154 URL: https://issues.apache.org/jira/browse/PARQUET-2154 Project: Parquet Issue Type: Bug Reporter: Yang Jie
Parquet only supports predicate push-down for non-repeated primitive types now(PARQUET-34), so if try to push-down a filter for a repeated primitive type, the constructor of ParquetFileReader will thrown `java.lang.IllegalArgumentException` as follows: {code:java} 21:57:24.190 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column f is repeated. at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:195) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:164) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:92) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59) at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:195) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64) at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95) at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45) at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149) at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72) at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:871) at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:790) at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:100) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:173) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$1(ParquetFileFormat.scala:340) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:211) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:272) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:118) at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:580) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1908) at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1268) at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1268) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2267) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1481) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) {code} and the above issue will cause resource leak. {code:java} public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException { this.converter = new ParquetMetadataConverter(options); this.file = file; this.f = file.newStream(); this.options = options; try { this.footer = readFooter(file, options, f, converter); } catch (Exception e) { // In case that reading footer throws an exception in the constructor, the new stream // should be closed. Otherwise, there's no way to close this outside. f.close(); throw e; } this.fileMetaData = footer.getFileMetaData(); this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called before filterRowGroups! if (null != fileDecryptor && fileDecryptor.plaintextFile()) { this.fileDecryptor = null; // Plaintext file. No need in decryptor } this.blocks = filterRowGroups(footer.getBlocks()); this.blockIndexStores = listWithNulls(this.blocks.size()); this.blockRowRanges = listWithNulls(this.blocks.size()); for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { paths.put(ColumnPath.get(col.getPath()), col); } this.crc = options.usePageChecksumVerification() ? new CRC32() : null; } {code} In the above code, when `filterRowGroups(footer.getBlocks())` throw an Exception, the open stream `{{{}this.f = file.newStream()`{}}} looks unable to be closed. -- This message was sent by Atlassian Jira (v8.20.7#820007)