This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e95f12b59d80 [SPARK-53633][SQL] Reuse InputStream in vectorized
Parquet reader
e95f12b59d80 is described below
commit e95f12b59d803a066be015e81d4d318c0801d682
Author: Cheng Pan <[email protected]>
AuthorDate: Tue Sep 23 15:46:40 2025 -0700
[SPARK-53633][SQL] Reuse InputStream in vectorized Parquet reader
### What changes were proposed in this pull request?
Reuse InputStream in vectorized Parquet reader between reading the footer
and row groups, on the executor side.
This PR is part of SPARK-52011, you can check more details at
https://github.com/apache/spark/pull/50765
### Why are the changes needed?
Reduce unnecessary RPCs of NameNode to improve performance and stability
for large Hadoop clusters.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
See https://github.com/apache/spark/pull/50765
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52384 from pan3793/SPARK-53633.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../datasources/parquet/OpenedParquetFooter.java | 34 ++
.../datasources/parquet/ParquetFooterReader.java | 87 +++--
.../parquet/SpecificParquetRecordReaderBase.java | 18 +-
.../parquet/VectorizedParquetRecordReader.java | 10 +-
.../datasources/parquet/ParquetFileFormat.scala | 373 ++++++++++++---------
.../v2/parquet/ParquetPartitionReaderFactory.scala | 199 ++++++-----
.../parquet/ParquetInteroperabilitySuite.scala | 5 +-
.../datasources/parquet/ParquetTest.scala | 4 +-
.../spark/sql/hive/execution/HiveDDLSuite.scala | 4 +-
9 files changed, 446 insertions(+), 288 deletions(-)
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/OpenedParquetFooter.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/OpenedParquetFooter.java
new file mode 100644
index 000000000000..5893609d2e9d
--- /dev/null
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/OpenedParquetFooter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.util.Optional;
+
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+public record OpenedParquetFooter(
+ ParquetMetadata footer,
+ HadoopInputFile inputFile,
+ Optional<SeekableInputStream> inputStreamOpt) {
+
+ public SeekableInputStream inputStream() {
+ return inputStreamOpt.get();
+ }
+}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java
index 438fffa11784..3edb328451fe 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java
@@ -18,10 +18,9 @@
package org.apache.spark.sql.execution.datasources.parquet;
import java.io.IOException;
+import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
@@ -37,53 +36,77 @@ import
org.apache.spark.sql.execution.datasources.PartitionedFile;
*/
public class ParquetFooterReader {
- public static final boolean SKIP_ROW_GROUPS = true;
- public static final boolean WITH_ROW_GROUPS = false;
-
/**
- * Reads footer for the input Parquet file 'split'. If 'skipRowGroup' is
true,
- * this will skip reading the Parquet row group metadata.
+ * Build a filter for reading footer of the input Parquet file 'split'.
+ * If 'skipRowGroup' is true, this will skip reading the Parquet row group
metadata.
*
* @param file a part (i.e. "block") of a single file that should be read
- * @param configuration hadoop configuration of file
+ * @param hadoopConf hadoop configuration of file
* @param skipRowGroup If true, skip reading row groups;
* if false, read row groups according to the file split
range
*/
- public static ParquetMetadata readFooter(
- Configuration configuration,
- PartitionedFile file,
- boolean skipRowGroup) throws IOException {
- long fileStart = file.start();
- ParquetMetadataConverter.MetadataFilter filter;
+ public static ParquetMetadataConverter.MetadataFilter buildFilter(
+ Configuration hadoopConf, PartitionedFile file, boolean skipRowGroup) {
if (skipRowGroup) {
- filter = ParquetMetadataConverter.SKIP_ROW_GROUPS;
+ return ParquetMetadataConverter.SKIP_ROW_GROUPS;
} else {
- filter = HadoopReadOptions.builder(configuration, file.toPath())
+ long fileStart = file.start();
+ return HadoopReadOptions.builder(hadoopConf, file.toPath())
.withRange(fileStart, fileStart + file.length())
.build()
.getMetadataFilter();
}
- return readFooter(configuration, file.toPath(), filter);
- }
-
- public static ParquetMetadata readFooter(Configuration configuration,
- Path file, ParquetMetadataConverter.MetadataFilter filter) throws
IOException {
- return readFooter(HadoopInputFile.fromPath(file, configuration), filter);
- }
-
- public static ParquetMetadata readFooter(Configuration configuration,
- FileStatus fileStatus, ParquetMetadataConverter.MetadataFilter filter)
throws IOException {
- return readFooter(HadoopInputFile.fromStatus(fileStatus, configuration),
filter);
}
- private static ParquetMetadata readFooter(HadoopInputFile inputFile,
+ public static ParquetMetadata readFooter(
+ HadoopInputFile inputFile,
ParquetMetadataConverter.MetadataFilter filter) throws IOException {
- ParquetReadOptions readOptions =
- HadoopReadOptions.builder(inputFile.getConfiguration(),
inputFile.getPath())
+ ParquetReadOptions readOptions = HadoopReadOptions
+ .builder(inputFile.getConfiguration(), inputFile.getPath())
.withMetadataFilter(filter).build();
- // Use try-with-resources to ensure fd is closed.
- try (ParquetFileReader fileReader = ParquetFileReader.open(inputFile,
readOptions)) {
+ try (var fileReader = ParquetFileReader.open(inputFile, readOptions)) {
return fileReader.getFooter();
}
}
+
+ /**
+ * Decoding Parquet files generally involves two steps:
+ * 1. read and resolve the metadata (footer),
+ * 2. read and decode the row groups/column chunks.
+ * <p>
+ * It's possible to avoid opening the file twice by resuing the
SeekableInputStream.
+ * When keepInputStreamOpen is true, the caller takes responsibility to
close the
+ * SeekableInputStream. Currently, this is only supported by parquet
vectorized reader.
+ *
+ * @param hadoopConf hadoop configuration of file
+ * @param file a part (i.e. "block") of a single file that should be
read
+ * @param keepInputStreamOpen when true, keep the SeekableInputStream of
file being open
+ * @return if keepInputStreamOpen is true, the returned OpenedParquetFooter
carries
+ * Some(SeekableInputStream), otherwise None.
+ */
+ public static OpenedParquetFooter openFileAndReadFooter(
+ Configuration hadoopConf,
+ PartitionedFile file,
+ boolean keepInputStreamOpen) throws IOException {
+ var readOptions = HadoopReadOptions.builder(hadoopConf, file.toPath())
+ // `keepInputStreamOpen` is true only when parquet vectorized reader
is used
+ // on the caller side, in such a case, the footer will be resued later
on
+ // reading row groups, so here must read row groups metadata ahead.
+ // when false, the caller uses parquet-mr to read the file, only file
metadata
+ // is required on planning phase, and parquet-mr will read the footer
again
+ // on reading row groups.
+ .withMetadataFilter(buildFilter(hadoopConf, file,
!keepInputStreamOpen))
+ .build();
+ var inputFile = HadoopInputFile.fromPath(file.toPath(), hadoopConf);
+ var inputStream = inputFile.newStream();
+ try (var fileReader = ParquetFileReader.open(inputFile, readOptions,
inputStream)) {
+ var footer = fileReader.getFooter();
+ if (keepInputStreamOpen) {
+ fileReader.detachFileInputStream();
+ return new OpenedParquetFooter(footer, inputFile,
Optional.of(inputStream));
+ } else {
+ return new OpenedParquetFooter(footer, inputFile, Optional.empty());
+ }
+ }
+ }
}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index d3716ef18447..038112086e47 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -49,6 +49,7 @@ import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
@@ -89,24 +90,27 @@ public abstract class SpecificParquetRecordReaderBase<T>
extends RecordReader<Vo
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext
taskAttemptContext)
throws IOException, InterruptedException {
- initialize(inputSplit, taskAttemptContext, Option.empty());
+ initialize(inputSplit, taskAttemptContext, Option.empty(), Option.empty(),
Option.empty());
}
public void initialize(
InputSplit inputSplit,
TaskAttemptContext taskAttemptContext,
+ Option<HadoopInputFile> inputFile,
+ Option<SeekableInputStream> inputStream,
Option<ParquetMetadata> fileFooter) throws IOException,
InterruptedException {
Configuration configuration = taskAttemptContext.getConfiguration();
FileSplit split = (FileSplit) inputSplit;
this.file = split.getPath();
+ ParquetReadOptions options = HadoopReadOptions
+ .builder(configuration, file)
+ .withRange(split.getStart(), split.getStart() + split.getLength())
+ .build();
ParquetFileReader fileReader;
- if (fileFooter.isDefined()) {
- fileReader = new ParquetFileReader(configuration, file,
fileFooter.get());
+ if (inputFile.isDefined() && fileFooter.isDefined() &&
inputStream.isDefined()) {
+ fileReader = new ParquetFileReader(
+ inputFile.get(), fileFooter.get(), options, inputStream.get());
} else {
- ParquetReadOptions options = HadoopReadOptions
- .builder(configuration, file)
- .withRange(split.getStart(), split.getStart() + split.getLength())
- .build();
fileReader = new ParquetFileReader(
HadoopInputFile.fromPath(file, configuration), options);
}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 9010c6e30be0..b15f79df527e 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -24,8 +24,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.apache.spark.SparkUnsupportedOperationException;
-import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns;
import scala.Option;
import scala.jdk.javaapi.CollectionConverters;
@@ -35,11 +33,15 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
+import org.apache.spark.SparkUnsupportedOperationException;
import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
import org.apache.spark.sql.execution.vectorized.ConstantColumnVector;
@@ -190,9 +192,11 @@ public class VectorizedParquetRecordReader extends
SpecificParquetRecordReaderBa
public void initialize(
InputSplit inputSplit,
TaskAttemptContext taskAttemptContext,
+ Option<HadoopInputFile> inputFile,
+ Option<SeekableInputStream> inputStream,
Option<ParquetMetadata> fileFooter)
throws IOException, InterruptedException, UnsupportedOperationException {
- super.initialize(inputSplit, taskAttemptContext, fileFooter);
+ super.initialize(inputSplit, taskAttemptContext, inputFile, inputStream,
fileFooter);
initializeInternal();
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index be6e5d188667..4cc3fe61d22b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -17,6 +17,10 @@
package org.apache.spark.sql.execution.datasources.parquet
+import java.io.Closeable
+import java.time.ZoneId
+import java.util.concurrent.atomic.AtomicBoolean
+
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Try}
@@ -27,9 +31,10 @@ import org.apache.hadoop.mapred.FileSplit
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.filter2.compat.FilterCompat
-import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate}
import
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
import org.apache.parquet.hadoop._
+import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
@@ -40,14 +45,15 @@ import org.apache.spark.sql.catalyst.expressions._
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, RebaseDateTime}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector,
OffHeapColumnVector, OnHeapColumnVector}
import org.apache.spark.sql.internal.{SessionStateHelper, SQLConf}
+import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
-import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
+import org.apache.spark.util.{SerializableConfiguration, ThreadUtils, Utils}
class ParquetFileFormat
extends FileFormat
@@ -108,28 +114,11 @@ class ParquetFileFormat
true
}
- /**
- * Build the reader.
- *
- * @note It is required to pass FileFormat.OPTION_RETURNING_BATCH in
options, to indicate whether
- * the reader should return row or columnar output.
- * If the caller can handle both, pass
- * FileFormat.OPTION_RETURNING_BATCH ->
- * supportBatch(sparkSession,
- * StructType(requiredSchema.fields ++ partitionSchema.fields))
- * as the option.
- * It should be set to "true" only if this reader can support it.
- */
- override def buildReaderWithPartitionValues(
- sparkSession: SparkSession,
- dataSchema: StructType,
- partitionSchema: StructType,
- requiredSchema: StructType,
- filters: Seq[Filter],
- options: Map[String, String],
- hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] =
{
- val sqlConf = getSqlConf(sparkSession)
- hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
+ private def setupHadoopConf(
+ hadoopConf: Configuration, sqlConf: SQLConf, requiredSchema: StructType):
Unit = {
+ hadoopConf.set(
+ ParquetInputFormat.READ_SUPPORT_CLASS,
+ classOf[ParquetReadSupport].getName)
hadoopConf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
requiredSchema.json)
@@ -159,7 +148,30 @@ class ParquetFileFormat
hadoopConf.setBoolean(
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
sqlConf.legacyParquetNanosAsLong)
+ }
+ /**
+ * Build the reader.
+ *
+ * @note It is required to pass FileFormat.OPTION_RETURNING_BATCH in
options, to indicate whether
+ * the reader should return row or columnar output.
+ * If the caller can handle both, pass
+ * FileFormat.OPTION_RETURNING_BATCH ->
+ * supportBatch(sparkSession,
+ * StructType(requiredSchema.fields ++ partitionSchema.fields))
+ * as the option.
+ * It should be set to "true" only if this reader can support it.
+ */
+ override def buildReaderWithPartitionValues(
+ sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+ val sqlConf = getSqlConf(sparkSession)
+ setupHadoopConf(hadoopConf, sqlConf, requiredSchema)
val broadcastedHadoopConf =
SerializableConfiguration.broadcast(sparkSession.sparkContext,
hadoopConf)
@@ -191,7 +203,7 @@ class ParquetFileFormat
options.getOrElse(FileFormat.OPTION_RETURNING_BATCH,
throw new IllegalArgumentException(
"OPTION_RETURNING_BATCH should always be set for ParquetFileFormat.
" +
- "To workaround this issue, set
spark.sql.parquet.enableVectorizedReader=false."))
+ s"To workaround this issue, set
${PARQUET_VECTORIZED_READER_ENABLED.key}=false."))
.equals("true")
if (returningBatch) {
// If the passed option said that we are to return batches, we need to
also be able to
@@ -202,150 +214,199 @@ class ParquetFileFormat
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
- val filePath = file.toPath
- val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
-
+ val split = new FileSplit(file.toPath, file.start, file.length,
Array.empty[String])
val sharedConf = broadcastedHadoopConf.value.value
- val fileFooter = if (enableVectorizedReader) {
- // When there are vectorized reads, we can avoid reading the footer
twice by reading
- // all row groups in advance and filter row groups according to
filters that require
- // push down (no need to read the footer metadata again).
- ParquetFooterReader.readFooter(sharedConf, file,
ParquetFooterReader.WITH_ROW_GROUPS)
- } else {
- ParquetFooterReader.readFooter(sharedConf, file,
ParquetFooterReader.SKIP_ROW_GROUPS)
- }
-
- val footerFileMetaData = fileFooter.getFileMetaData
- val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
- footerFileMetaData.getKeyValueMetaData.get,
- datetimeRebaseModeInRead)
- val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
- footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
-
- // Try to push down filters when filter push-down is enabled.
- val pushed = if (enableParquetFilterPushDown) {
- val parquetSchema = footerFileMetaData.getSchema
- val parquetFilters = new ParquetFilters(
- parquetSchema,
- pushDownDate,
- pushDownTimestamp,
- pushDownDecimal,
- pushDownStringPredicate,
- pushDownInFilterThreshold,
- isCaseSensitive,
- datetimeRebaseSpec)
- filters
- // Collects all converted Parquet filter predicates. Notice that not
all predicates can be
- // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
- // is used here.
- .flatMap(parquetFilters.createFilter(_))
- .reduceOption(FilterApi.and)
- } else {
- None
- }
-
- // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions
to int96 timestamps'
- // *only* if the file was created by something other than "parquet-mr",
so check the actual
- // writer here for this file. We have to do this per-file, as each file
in the table may
- // have different writers.
- // Define isCreatedByParquetMr as function to avoid unnecessary parquet
footer reads.
- def isCreatedByParquetMr: Boolean =
- footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
-
- val convertTz =
- if (timestampConversion && !isCreatedByParquetMr) {
-
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+ // When there are vectorized reads, we can avoid
+ // 1. opening the file twice by transfering the SeekableInputStream
+ // 2. reading the footer twice by reading all row groups in advance and
filter row groups
+ // according to filters that require push down
+ val openedFooter =
+ ParquetFooterReader.openFileAndReadFooter(sharedConf, file,
enableVectorizedReader)
+ assert(openedFooter.inputStreamOpt.isPresent == enableVectorizedReader)
+
+ // Before transferring the ownership of inputStream to the
vectorizedReader,
+ // we must take responsibility to close the inputStream if something
goes wrong
+ // to avoid resource leak.
+ val shouldCloseInputStream = new
AtomicBoolean(openedFooter.inputStreamOpt.isPresent)
+ try {
+ val footerFileMetaData = openedFooter.footer.getFileMetaData
+ val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
+ footerFileMetaData.getKeyValueMetaData.get,
+ datetimeRebaseModeInRead)
+ val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
+ footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+
+ // Try to push down filters when filter push-down is enabled.
+ val pushed = if (enableParquetFilterPushDown) {
+ val parquetSchema = footerFileMetaData.getSchema
+ val parquetFilters = new ParquetFilters(
+ parquetSchema,
+ pushDownDate,
+ pushDownTimestamp,
+ pushDownDecimal,
+ pushDownStringPredicate,
+ pushDownInFilterThreshold,
+ isCaseSensitive,
+ datetimeRebaseSpec)
+ filters
+ // Collects all converted Parquet filter predicates. Notice that
not all predicates
+ // can be converted (`ParquetFilters.createFilter` returns an
`Option`). That's why
+ // a `flatMap` is used here.
+ .flatMap(parquetFilters.createFilter)
+ .reduceOption(FilterApi.and)
} else {
None
}
-
- val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
- val hadoopAttemptContext =
- new TaskAttemptContextImpl(broadcastedHadoopConf.value.value,
attemptId)
-
- // Try to push down filters when filter push-down is enabled.
- // Notice: This push-down is RowGroups level, not individual records.
- if (pushed.isDefined) {
-
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration,
pushed.get)
- }
- val taskContext = Option(TaskContext.get())
- if (enableVectorizedReader) {
- val vectorizedReader = new VectorizedParquetRecordReader(
- convertTz.orNull,
- datetimeRebaseSpec.mode.toString,
- datetimeRebaseSpec.timeZone,
- int96RebaseSpec.mode.toString,
- int96RebaseSpec.timeZone,
- enableOffHeapColumnVector && taskContext.isDefined,
- capacity)
- // SPARK-37089: We cannot register a task completion listener to close
this iterator here
- // because downstream exec nodes have already registered their
listeners. Since listeners
- // are executed in reverse order of registration, a listener
registered here would close the
- // iterator while downstream exec nodes are still running. When
off-heap column vectors are
- // enabled, this can cause a use-after-free bug leading to a segfault.
- //
- // Instead, we use FileScanRDD's task completion listener to close
this iterator.
- val iter = new RecordReaderIterator(vectorizedReader)
- try {
- vectorizedReader.initialize(split, hadoopAttemptContext,
Option.apply(fileFooter))
- logDebug(s"Appending $partitionSchema ${file.partitionValues}")
- vectorizedReader.initBatch(partitionSchema, file.partitionValues)
- if (returningBatch) {
- vectorizedReader.enableReturningBatches()
+ // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone
conversions to int96
+ // timestamps *only* if the file was created by something other than
"parquet-mr",
+ // so check the actual writer here for this file. We have to do this
per-file,
+ // as each file in the table may have different writers.
+ // Define isCreatedByParquetMr as function to avoid unnecessary
parquet footer reads.
+ def isCreatedByParquetMr: Boolean =
+ footerFileMetaData.getCreatedBy.startsWith("parquet-mr")
+
+ val convertTz =
+ if (timestampConversion && !isCreatedByParquetMr) {
+
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+ } else {
+ None
}
- // UnsafeRowParquetRecordReader appends the columns internally to
avoid another copy.
- iter.asInstanceOf[Iterator[InternalRow]]
- } catch {
- case e: Throwable =>
- // SPARK-23457: In case there is an exception in initialization,
close the iterator to
- // avoid leaking resources.
- iter.close()
- throw e
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(),
TaskType.MAP, 0), 0)
+ val hadoopAttemptContext =
+ new TaskAttemptContextImpl(broadcastedHadoopConf.value.value,
attemptId)
+
+ // Try to push down filters when filter push-down is enabled.
+ // Notice: This push-down is RowGroups level, not individual records.
+ pushed.foreach {
+
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, _)
}
- } else {
- logDebug(s"Falling back to parquet-mr")
- // ParquetRecordReader returns InternalRow
- val readSupport = new ParquetReadSupport(
- convertTz,
- enableVectorizedReader = false,
- datetimeRebaseSpec,
- int96RebaseSpec)
- val reader = if (pushed.isDefined && enableRecordFilter) {
- val parquetFilter = FilterCompat.get(pushed.get, null)
- new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
+ if (enableVectorizedReader) {
+ buildVectorizedIterator(
+ hadoopAttemptContext, split, file.partitionValues,
partitionSchema, convertTz,
+ datetimeRebaseSpec, int96RebaseSpec, enableOffHeapColumnVector,
returningBatch,
+ capacity, openedFooter, shouldCloseInputStream)
} else {
- new ParquetRecordReader[InternalRow](readSupport)
+ logDebug(s"Falling back to parquet-mr")
+ buildRowBasedIterator(
+ hadoopAttemptContext, split, file.partitionValues,
partitionSchema, convertTz,
+ datetimeRebaseSpec, int96RebaseSpec, requiredSchema, pushed,
enableRecordFilter)
}
- val readerWithRowIndexes =
ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader,
- requiredSchema)
- val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes)
- try {
- readerWithRowIndexes.initialize(split, hadoopAttemptContext)
-
- val fullSchema = toAttributes(requiredSchema) ++
toAttributes(partitionSchema)
- val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema,
fullSchema)
-
- if (partitionSchema.length == 0) {
- // There is no partition columns
- iter.map(unsafeProjection)
- } else {
- val joinedRow = new JoinedRow()
- iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
- }
- } catch {
- case e: Throwable =>
- // SPARK-23457: In case there is an exception in initialization,
close the iterator to
- // avoid leaking resources.
- iter.close()
- throw e
+ } finally {
+ if (shouldCloseInputStream.get) {
+ openedFooter.inputStreamOpt.ifPresent(Utils.closeQuietly)
}
}
}
}
+ // scalastyle:off argcount
+ private def buildVectorizedIterator(
+ hadoopAttemptContext: TaskAttemptContextImpl,
+ split: FileSplit,
+ partitionValues: InternalRow,
+ partitionSchema: StructType,
+ convertTz: Option[ZoneId],
+ datetimeRebaseSpec: RebaseDateTime.RebaseSpec,
+ int96RebaseSpec: RebaseDateTime.RebaseSpec,
+ enableOffHeapColumnVector: Boolean,
+ returningBatch: Boolean,
+ batchSize: Int,
+ openedFooter: OpenedParquetFooter,
+ shouldCloseInputStream: AtomicBoolean): Iterator[InternalRow] = {
+ // scalastyle:on argcount
+ assert(openedFooter.inputStreamOpt.isPresent)
+ val vectorizedReader = new VectorizedParquetRecordReader(
+ convertTz.orNull,
+ datetimeRebaseSpec.mode.toString,
+ datetimeRebaseSpec.timeZone,
+ int96RebaseSpec.mode.toString,
+ int96RebaseSpec.timeZone,
+ enableOffHeapColumnVector && TaskContext.get() != null,
+ batchSize)
+ // SPARK-37089: We cannot register a task completion listener to close
this iterator here
+ // because downstream exec nodes have already registered their listeners.
Since listeners
+ // are executed in reverse order of registration, a listener registered
here would close the
+ // iterator while downstream exec nodes are still running. When off-heap
column vectors are
+ // enabled, this can cause a use-after-free bug leading to a segfault.
+ //
+ // Instead, we use FileScanRDD's task completion listener to close this
iterator.
+ val iter = new RecordReaderIterator(vectorizedReader)
+ try {
+ vectorizedReader.initialize(
+ split, hadoopAttemptContext, Some(openedFooter.inputFile),
+ Some(openedFooter.inputStream), Some(openedFooter.footer))
+ // The caller don't need to take care of the close of inputStream after
calling
+ // `initialize` because the ownership of inputStream has been
transferred to the
+ // vectorizedReader
+ shouldCloseInputStream.set(false)
+ logDebug(s"Appending $partitionSchema $partitionValues")
+ vectorizedReader.initBatch(partitionSchema, partitionValues)
+ if (returningBatch) {
+ vectorizedReader.enableReturningBatches()
+ }
+
+ // UnsafeRowParquetRecordReader appends the columns internally to avoid
another copy.
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } catch {
+ case e: Throwable =>
+ // SPARK-23457: In case there is an exception in initialization, close
the iterator to
+ // avoid leaking resources.
+ iter.close()
+ throw e
+ }
+ }
+
+ private def buildRowBasedIterator(
+ hadoopAttemptContext: TaskAttemptContextImpl,
+ split: FileSplit,
+ partitionValues: InternalRow,
+ partitionSchema: StructType,
+ convertTz: Option[ZoneId],
+ datetimeRebaseSpec: RebaseDateTime.RebaseSpec,
+ int96RebaseSpec: RebaseDateTime.RebaseSpec,
+ requiredSchema: StructType,
+ pushed: Option[FilterPredicate],
+ enableRecordFilter: Boolean): Iterator[InternalRow] with Closeable = {
+ // ParquetRecordReader returns InternalRow
+ val readSupport = new ParquetReadSupport(
+ convertTz,
+ enableVectorizedReader = false,
+ datetimeRebaseSpec,
+ int96RebaseSpec)
+ val reader = if (pushed.isDefined && enableRecordFilter) {
+ val parquetFilter = FilterCompat.get(pushed.get, null)
+ new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
+ } else {
+ new ParquetRecordReader[InternalRow](readSupport)
+ }
+ val readerWithRowIndexes =
ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader,
+ requiredSchema)
+ val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes)
+ try {
+ readerWithRowIndexes.initialize(split, hadoopAttemptContext)
+
+ val fullSchema = toAttributes(requiredSchema) ++
toAttributes(partitionSchema)
+ val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema,
fullSchema)
+
+ if (partitionSchema.length == 0) {
+ // There is no partition columns
+ iter.map(unsafeProjection)
+ } else {
+ val joinedRow = new JoinedRow()
+ iter.map(d => unsafeProjection(joinedRow(d, partitionValues)))
+ }
+ } catch {
+ case e: Throwable =>
+ // SPARK-23457: In case there is an exception in initialization, close
the iterator to
+ // avoid leaking resources.
+ iter.close()
+ throw e
+ }
+ }
+
override def supportDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true
@@ -447,9 +508,9 @@ object ParquetFileFormat extends Logging {
// Skips row group information since we only need the schema.
// ParquetFileReader.readFooter throws RuntimeException, instead of
IOException,
// when it can't read the footer.
- Some(new Footer(currentFile.getPath(),
+ Some(new Footer(currentFile.getPath,
ParquetFooterReader.readFooter(
- conf, currentFile, SKIP_ROW_GROUPS)))
+ HadoopInputFile.fromStatus(currentFile, conf), SKIP_ROW_GROUPS)))
} catch { case e: RuntimeException =>
if (ignoreCorruptFiles) {
logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH,
currentFile)}", e)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index 70ae8068a03a..1f03fea25687 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -17,14 +17,16 @@
package org.apache.spark.sql.execution.datasources.v2.parquet
import java.time.ZoneId
+import java.util.Optional
import org.apache.hadoop.mapred.FileSplit
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate}
-import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
-import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
+import org.apache.parquet.hadoop._
+import org.apache.parquet.hadoop.metadata.FileMetaData
+import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
@@ -86,17 +88,21 @@ case class ParquetPartitionReaderFactory(
private val parquetReaderCallback = new ParquetReaderCallback()
- private def getFooter(file: PartitionedFile): ParquetMetadata = {
- val conf = broadcastedConf.value.value
- if (aggregation.isDefined || enableVectorizedReader) {
- // There are two purposes for reading footer with row groups:
- // 1. When there are aggregates to push down, we get max/min/count from
footer statistics.
- // 2. When there are vectorized reads, we can avoid reading the footer
twice by reading
- // all row groups in advance and filter row groups according to
filters that require
- // push down (no need to read the footer metadata again).
- ParquetFooterReader.readFooter(conf, file,
ParquetFooterReader.WITH_ROW_GROUPS)
+ private def openFileAndReadFooter(file: PartitionedFile):
OpenedParquetFooter = {
+ val hadoopConf = broadcastedConf.value.value
+ if (aggregation.isDefined) {
+ val inputFile = HadoopInputFile.fromPath(file.toPath, hadoopConf)
+ // When there are aggregates to push down, we get max/min/count from
footer statistics.
+ val footer = ParquetFooterReader.readFooter(
+ inputFile,
+ ParquetFooterReader.buildFilter(hadoopConf, file, false))
+ new OpenedParquetFooter(footer, inputFile, Optional.empty)
} else {
- ParquetFooterReader.readFooter(conf, file,
ParquetFooterReader.SKIP_ROW_GROUPS)
+ // When there are vectorized reads, we can avoid
+ // 1. opening the file twice by transfering the SeekableInputStream
+ // 2. reading the footer twice by reading all row groups in advance and
filter row groups
+ // according to filters that require push down
+ ParquetFooterReader.openFileAndReadFooter(hadoopConf, file,
enableVectorizedReader)
}
}
@@ -130,13 +136,14 @@ case class ParquetPartitionReaderFactory(
new PartitionReader[InternalRow] {
private var hasNext = true
private lazy val row: InternalRow = {
- val footer = getFooter(file)
-
- if (footer != null && footer.getBlocks.size > 0) {
- ParquetUtils.createAggInternalRowFromFooter(footer,
file.urlEncodedPath,
- dataSchema,
- partitionSchema, aggregation.get, readDataSchema,
file.partitionValues,
- getDatetimeRebaseSpec(footer.getFileMetaData))
+ val openedFooter = openFileAndReadFooter(file)
+ assert(openedFooter.inputStreamOpt.isEmpty)
+
+ if (openedFooter.footer != null &&
openedFooter.footer.getBlocks.size > 0) {
+ ParquetUtils.createAggInternalRowFromFooter(openedFooter.footer,
+ file.urlEncodedPath, dataSchema, partitionSchema,
aggregation.get,
+ readDataSchema, file.partitionValues,
+ getDatetimeRebaseSpec(openedFooter.footer.getFileMetaData))
} else {
null
}
@@ -175,11 +182,14 @@ case class ParquetPartitionReaderFactory(
new PartitionReader[ColumnarBatch] {
private var hasNext = true
private val batch: ColumnarBatch = {
- val footer = getFooter(file)
- if (footer != null && footer.getBlocks.size > 0) {
- val row = ParquetUtils.createAggInternalRowFromFooter(footer,
file.urlEncodedPath,
- dataSchema, partitionSchema, aggregation.get, readDataSchema,
file.partitionValues,
- getDatetimeRebaseSpec(footer.getFileMetaData))
+ val openedFooter = openFileAndReadFooter(file)
+ assert(openedFooter.inputStreamOpt.isEmpty)
+
+ if (openedFooter.footer != null &&
openedFooter.footer.getBlocks.size > 0) {
+ val row =
ParquetUtils.createAggInternalRowFromFooter(openedFooter.footer,
+ file.urlEncodedPath, dataSchema, partitionSchema,
aggregation.get,
+ readDataSchema, file.partitionValues,
+ getDatetimeRebaseSpec(openedFooter.footer.getFileMetaData))
AggregatePushDownUtils.convertAggregatesRowToBatch(
row, readDataSchema, enableOffHeapColumnVector &&
Option(TaskContext.get()).isDefined)
} else {
@@ -211,74 +221,93 @@ case class ParquetPartitionReaderFactory(
RebaseSpec) => RecordReader[Void, T]): RecordReader[Void, T] = {
val conf = broadcastedConf.value.value
- val filePath = file.toPath
- val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
- val fileFooter = getFooter(file)
- val footerFileMetaData = fileFooter.getFileMetaData
- val datetimeRebaseSpec = getDatetimeRebaseSpec(footerFileMetaData)
- // Try to push down filters when filter push-down is enabled.
- val pushed = if (enableParquetFilterPushDown) {
- val parquetSchema = footerFileMetaData.getSchema
- val parquetFilters = new ParquetFilters(
- parquetSchema,
- pushDownDate,
- pushDownTimestamp,
- pushDownDecimal,
- pushDownStringPredicate,
- pushDownInFilterThreshold,
- isCaseSensitive,
- datetimeRebaseSpec)
- filters
- // Collects all converted Parquet filter predicates. Notice that not
all predicates can be
- // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
- // is used here.
- .flatMap(parquetFilters.createFilter)
- .reduceOption(FilterApi.and)
- } else {
- None
+ val split = new FileSplit(file.toPath, file.start, file.length,
Array.empty[String])
+ val openedFooter = openFileAndReadFooter(file)
+ assert {
+ openedFooter.inputStreamOpt.isPresent == (aggregation.isEmpty &&
enableVectorizedReader)
}
- // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions
to int96 timestamps'
- // *only* if the file was created by something other than "parquet-mr", so
check the actual
- // writer here for this file. We have to do this per-file, as each file
in the table may
- // have different writers.
- // Define isCreatedByParquetMr as function to avoid unnecessary parquet
footer reads.
- def isCreatedByParquetMr: Boolean =
- footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
-
- val convertTz =
- if (timestampConversion && !isCreatedByParquetMr) {
-
Some(DateTimeUtils.getZoneId(conf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+
+ // Before transferring the ownership of inputStream to the
vectorizedReader,
+ // we must take responsibility to close the inputStream if something goes
wrong
+ // to avoid resource leak.
+ var shouldCloseInputStream = openedFooter.inputStreamOpt.isPresent
+ try {
+ val footerFileMetaData = openedFooter.footer.getFileMetaData
+ val datetimeRebaseSpec = getDatetimeRebaseSpec(footerFileMetaData)
+ // Try to push down filters when filter push-down is enabled.
+ val pushed = if (enableParquetFilterPushDown) {
+ val parquetSchema = footerFileMetaData.getSchema
+ val parquetFilters = new ParquetFilters(
+ parquetSchema,
+ pushDownDate,
+ pushDownTimestamp,
+ pushDownDecimal,
+ pushDownStringPredicate,
+ pushDownInFilterThreshold,
+ isCaseSensitive,
+ datetimeRebaseSpec)
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not
all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
+ // is used here.
+ .flatMap(parquetFilters.createFilter)
+ .reduceOption(FilterApi.and)
} else {
None
}
- val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
- val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
+ // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions
to int96 timestamps'
+ // *only* if the file was created by something other than "parquet-mr",
so check the actual
+ // writer here for this file. We have to do this per-file, as each file
in the table may
+ // have different writers.
+ // Define isCreatedByParquetMr as function to avoid unnecessary parquet
footer reads.
+ def isCreatedByParquetMr: Boolean =
+ footerFileMetaData.getCreatedBy.startsWith("parquet-mr")
+
+ val convertTz =
+ if (timestampConversion && !isCreatedByParquetMr) {
+
Some(DateTimeUtils.getZoneId(conf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+ } else {
+ None
+ }
- // Try to push down filters when filter push-down is enabled.
- // Notice: This push-down is RowGroups level, not individual records.
- if (pushed.isDefined) {
-
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration,
pushed.get)
- }
- val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
- footerFileMetaData.getKeyValueMetaData.get,
- int96RebaseModeInRead)
- Utils.createResourceUninterruptiblyIfInTaskThread {
- Utils.tryInitializeResource(
- buildReaderFunc(
- file.partitionValues,
- pushed,
- convertTz,
- datetimeRebaseSpec,
- int96RebaseSpec)
- ) { reader =>
- reader match {
- case vectorizedReader: VectorizedParquetRecordReader =>
- vectorizedReader.initialize(split, hadoopAttemptContext,
Option.apply(fileFooter))
- case _ =>
- reader.initialize(split, hadoopAttemptContext)
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
+ val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
+
+ // Try to push down filters when filter push-down is enabled.
+ // Notice: This push-down is RowGroups level, not individual records.
+ pushed.foreach {
+
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, _)
+ }
+ val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
+ footerFileMetaData.getKeyValueMetaData.get,
+ int96RebaseModeInRead)
+ Utils.createResourceUninterruptiblyIfInTaskThread {
+ Utils.tryInitializeResource(
+ buildReaderFunc(
+ file.partitionValues,
+ pushed,
+ convertTz,
+ datetimeRebaseSpec,
+ int96RebaseSpec)
+ ) { reader =>
+ reader match {
+ case vectorizedReader: VectorizedParquetRecordReader =>
+ vectorizedReader.initialize(
+ split, hadoopAttemptContext, Some(openedFooter.inputFile),
+ Some(openedFooter.inputStream), Some(openedFooter.footer))
+ // We don't need to take care of the close of inputStream after
calling `initialize`
+ // because the ownership of inputStream has been transferred to
the vectorizedReader
+ shouldCloseInputStream = false
+ case _ =>
+ reader.initialize(split, hadoopAttemptContext)
+ }
+ reader
}
- reader
+ }
+ } finally {
+ if (shouldCloseInputStream) {
+ openedFooter.inputStreamOpt.ifPresent(Utils.closeQuietly)
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
index 257a89754f4e..044f6ce202d8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
@@ -22,6 +22,7 @@ import java.time.ZoneOffset
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
+import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.spark.sql.Row
@@ -229,8 +230,8 @@ class ParquetInteroperabilitySuite extends
ParquetCompatibilityTest with SharedS
// sure the test is configured correctly.
assert(parts.size == 2)
parts.foreach { part =>
- val oneFooter =
- ParquetFooterReader.readFooter(hadoopConf, part.getPath,
NO_FILTER)
+ val oneFooter = ParquetFooterReader.readFooter(
+ HadoopInputFile.fromStatus(part, hadoopConf), NO_FILTER)
assert(oneFooter.getFileMetaData.getSchema.getColumns.size ===
1)
val typeName = oneFooter
.getFileMetaData.getSchema.getColumns.get(0).getPrimitiveType.getPrimitiveTypeName
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index b7b082e32965..12daef65eabc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -144,8 +144,8 @@ private[sql] trait ParquetTest extends
FileBasedDataSourceTest {
protected def readFooter(path: Path, configuration: Configuration):
ParquetMetadata = {
ParquetFooterReader.readFooter(
- configuration,
- new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE),
+ HadoopInputFile.fromPath(
+ new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE),
configuration),
ParquetMetadataConverter.NO_FILTER)
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index d3f625542d96..769b633a9c52 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._
import org.apache.hadoop.fs.Path
import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
+import org.apache.parquet.hadoop.util.HadoopInputFile
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.BeforeAndAfterEach
@@ -2710,8 +2711,9 @@ class HiveDDLSuite
OrcFileOperator.getFileReader(maybeFile.get.toPath.toString).get.getCompression.name
case "parquet" =>
+ val hadoopConf = sparkContext.hadoopConfiguration
val footer = ParquetFooterReader.readFooter(
- sparkContext.hadoopConfiguration, new Path(maybeFile.get.getPath),
NO_FILTER)
+ HadoopInputFile.fromPath(new Path(maybeFile.get.getPath),
hadoopConf), NO_FILTER)
footer.getBlocks.get(0).getColumns.get(0).getCodec.toString
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]