Hyukjin Kwon created PARQUET-1368:
-------------------------------------
Summary: ParquetFileReader should close its input stream for the
failure in constructor
Key: PARQUET-1368
URL: https://issues.apache.org/jira/browse/PARQUET-1368
Project: Parquet
Issue Type: Bug
Components: parquet-mr
Affects Versions: 1.10.0
Reporter: Hyukjin Kwon
I was trying to replace deprecated usage {{readFooter}} to
{{ParquetFileReader.open}} according to the node:
{code}
[warn]
/home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:368:
method readFooter in object ParquetFileReader is deprecated: see corresponding
Javadoc for more information.
[warn] ParquetFileReader.readFooter(sharedConf, filePath,
SKIP_ROW_GROUPS).getFileMetaData
[warn] ^
[warn]
/home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:545:
method readFooter in object ParquetFileReader is deprecated: see corresponding
Javadoc for more information.
[warn] ParquetFileReader.readFooter(
[warn] ^
{code}
Then, I realised some test suites reports resource leak:
{code}
java.lang.Throwable
at
org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
at
org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
at
org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:687)
at
org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:595)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.createParquetReader(ParquetUtils.scala:67)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.readFooter(ParquetUtils.scala:46)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:544)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:539)
at
scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
at
scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
at
scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at
scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
at
scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
at
scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
at
scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
at
scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at
scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)
at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}
The root cause seems to be, the test case intentionally tries to read malformed
Parquet file and see if the error can be handled correctly.
In that case, the error is thrown in it's constructor:
{code}
java.lang.RuntimeException:
file:/private/var/folders/71/484zt4z10ks1vydt03bhp6hr0000gp/T/spark-c102dafc-b3f7-4c7e-90ee-33d8ecbcd225/second/_SUCCESS
is not a Parquet file (too small length: 0)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:514)
at
org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:689)
at
org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:595)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.createParquetReader(ParquetUtils.scala:67)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.readFooter(ParquetUtils.scala:46)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:544)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:539)
at
scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
at
scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
at
scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at
scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}
So, in this case,
{code}
public ParquetFileReader(InputFile file, ParquetReadOptions options) throws
IOException {
this.converter = new ParquetMetadataConverter(options);
this.file = file;
this.f = file.newStream();
this.options = options;
this.footer = readFooter(file, options, f, converter);
this.fileMetaData = footer.getFileMetaData();
this.blocks = filterRowGroups(footer.getBlocks());
for (ColumnDescriptor col :
footer.getFileMetaData().getSchema().getColumns()) {
paths.put(ColumnPath.get(col.getPath()), col);
}
}
{code}
the open stream {{this.f = file.newStream()}} looks unable to be closed.
Therefore, looks the test case reports the resource leak.
In case of the old deprecated {{readFooter}} it's done as below:
{code}
@Deprecated
public static final ParquetMetadata readFooter(InputFile file, MetadataFilter
filter) throws IOException {
ParquetReadOptions options;
if (file instanceof HadoopInputFile) {
options = HadoopReadOptions.builder(((HadoopInputFile)
file).getConfiguration())
.withMetadataFilter(filter).build();
} else {
options = ParquetReadOptions.builder().withMetadataFilter(filter).build();
}
try (SeekableInputStream in = file.newStream()) {
return readFooter(file, options, in);
}
}
{code}
So, looks we are fine with this deprecated method.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)