This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f715e8a02e8 [HUDI-7565] Create spark file readers to read a single
file instead of an entire partition (#10954)
f715e8a02e8 is described below
commit f715e8a02e8ee5561274ad38bdda5e863317b240
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Apr 12 13:29:12 2024 -0400
[HUDI-7565] Create spark file readers to read a single file instead of an
entire partition (#10954)
Co-authored-by: Jonathan Vexler <=>
---
.../datasources/parquet/SparkParquetReader.scala | 44 ++++
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 18 +-
.../parquet/SparkParquetReaderBase.scala | 96 +++++++
.../parquet/TestSparkParquetReaderFormat.scala | 56 ++++
.../hudi/functional/TestSparkParquetReader.java | 48 ++++
.../org/apache/hudi/util/JavaConversions.scala | 22 +-
.../apache/spark/sql/adapter/Spark2Adapter.scala | 20 +-
.../datasources/parquet/Spark24ParquetReader.scala | 225 ++++++++++++++++
.../apache/spark/sql/adapter/Spark3_0Adapter.scala | 20 +-
.../datasources/parquet/Spark30ParquetReader.scala | 229 +++++++++++++++++
.../apache/spark/sql/adapter/Spark3_1Adapter.scala | 19 +-
.../datasources/parquet/Spark31ParquetReader.scala | 242 ++++++++++++++++++
.../apache/spark/sql/adapter/Spark3_2Adapter.scala | 20 +-
.../datasources/parquet/Spark32ParquetReader.scala | 267 +++++++++++++++++++
.../apache/spark/sql/adapter/Spark3_3Adapter.scala | 20 +-
.../datasources/parquet/Spark33ParquetReader.scala | 268 +++++++++++++++++++
.../apache/spark/sql/adapter/Spark3_4Adapter.scala | 20 +-
.../datasources/parquet/Spark34ParquetReader.scala | 277 ++++++++++++++++++++
.../apache/spark/sql/adapter/Spark3_5Adapter.scala | 20 +-
.../datasources/parquet/Spark35ParquetReader.scala | 284 +++++++++++++++++++++
20 files changed, 2206 insertions(+), 9 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala
new file mode 100644
index 00000000000..920e4cb0e0b
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+trait SparkParquetReader extends Serializable {
+ /**
+ * Read an individual parquet file
+ *
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param sharedConf the hadoop conf
+ * @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
+ */
+ def read(file: PartitionedFile,
+ requiredSchema: StructType,
+ partitionSchema: StructType,
+ filters: Seq[Filter],
+ sharedConf: Configuration): Iterator[InternalRow]
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 1c6111afe47..91fe6dabc2e 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -19,6 +19,7 @@
package org.apache.spark.sql.hudi
import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -33,7 +34,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{Command,
LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
SparkParquetReader}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.parser.HoodieExtendedParserInterface
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
@@ -214,4 +216,18 @@ trait SparkAdapter extends Serializable {
* Tries to translate a Catalyst Expression into data source Filter
*/
def translateFilter(predicate: Expression, supportNestedPredicatePushdown:
Boolean = false): Option[Filter]
+
+ /**
+ * Get parquet file reader
+ *
+ * @param vectorized true if vectorized reading is not prohibited due to
schema, reading mode, etc
+ * @param sqlConf the [[SQLConf]] used for the read
+ * @param options passed as a param to the file format
+ * @param hadoopConf some configs will be set for the hadoopConf
+ * @return parquet file reader
+ */
+ def createParquetFileReader(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration): SparkParquetReader
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
new file mode 100644
index 00000000000..2b47da76456
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+abstract class SparkParquetReaderBase(enableVectorizedReader: Boolean,
+ enableParquetFilterPushDown: Boolean,
+ pushDownDate: Boolean,
+ pushDownTimestamp: Boolean,
+ pushDownDecimal: Boolean,
+ pushDownInFilterThreshold: Int,
+ isCaseSensitive: Boolean,
+ timestampConversion: Boolean,
+ enableOffHeapColumnVector: Boolean,
+ capacity: Int,
+ returningBatch: Boolean,
+ enableRecordFilter: Boolean,
+ timeZoneId: Option[String]) extends
SparkParquetReader {
+ /**
+ * Read an individual parquet file
+ *
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param sharedConf the hadoop conf
+ * @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
+ */
+ final def read(file: PartitionedFile,
+ requiredSchema: StructType,
+ partitionSchema: StructType,
+ filters: Seq[Filter],
+ sharedConf: Configuration): Iterator[InternalRow] = {
+ val conf = new Configuration(sharedConf)
+ conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
requiredSchema.json)
+ conf.set(ParquetWriteSupport.SPARK_ROW_SCHEMA, requiredSchema.json)
+ ParquetWriteSupport.setSchema(requiredSchema, conf)
+ doRead(file, requiredSchema, partitionSchema, filters, conf)
+ }
+
+ /**
+ * Implemented for each spark version
+ *
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param sharedConf the hadoop conf
+ * @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
+ */
+ protected def doRead(file: PartitionedFile,
+ requiredSchema: StructType,
+ partitionSchema: StructType,
+ filters: Seq[Filter],
+ sharedConf: Configuration): Iterator[InternalRow]
+
+}
+
+trait SparkParquetReaderBuilder {
+ /**
+ * Get parquet file reader
+ *
+ * @param vectorized true if vectorized reading is not prohibited due to
schema, reading mode, etc
+ * @param sqlConf the [[SQLConf]] used for the read
+ * @param options passed as a param to the file format
+ * @param hadoopConf some configs will be set for the hadoopConf
+ * @return properties needed for reading a parquet file
+ */
+ def build(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration): SparkParquetReader
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSparkParquetReaderFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSparkParquetReaderFormat.scala
new file mode 100644
index 00000000000..bf513847cfc
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSparkParquetReaderFormat.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * Class used to test [[SparkParquetReader]]
+ * This class should have the same functionality as [[ParquetFileFormat]]
+ */
+class TestSparkParquetReaderFormat extends ParquetFileFormat with
SparkAdapterSupport {
+
+ override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
+ //reader must be created outsize of the lambda. This happens on the driver
+ val reader =
sparkAdapter.createParquetFileReader(supportBatch(sparkSession,
+ StructType(partitionSchema.fields ++ requiredSchema.fields)),
+ sparkSession.sqlContext.conf, options, hadoopConf)
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
+
+ (file: PartitionedFile) => {
+ //code inside the lambda will run on the executor
+ reader.read(file, requiredSchema, partitionSchema, filters,
broadcastedHadoopConf.value.value)
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkParquetReader.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkParquetReader.java
new file mode 100644
index 00000000000..1c755ce3c8f
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkParquetReader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.util.JavaConversions;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("functional")
+public class TestSparkParquetReader extends TestBootstrapReadBase {
+
+ @Test
+ public void testReader() {
+ dataGen = new HoodieTestDataGenerator(dashPartitionPaths);
+ int n = 10;
+ Dataset<Row> inserts = makeInsertDf("000", n);
+ inserts.write().format("parquet").save(bootstrapBasePath);
+ Dataset<Row> parquetReadRows =
JavaConversions.createTestDataFrame(sparkSession, bootstrapBasePath);
+ Dataset<Row> datasourceReadRows =
sparkSession.read().format("parquet").load(bootstrapBasePath);
+ assertEquals(datasourceReadRows.count(), n);
+ assertEquals(parquetReadRows.count(), n);
+ assertEquals(datasourceReadRows.except(parquetReadRows).count(), 0);
+ assertEquals(parquetReadRows.except(datasourceReadRows).count(), 0);
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/JavaConversions.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/JavaConversions.scala
index c9abe090975..52333e72628 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/JavaConversions.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/JavaConversions.scala
@@ -18,9 +18,14 @@
package org.apache.hudi.util
+import
org.apache.spark.sql.execution.datasources.parquet.TestSparkParquetReaderFormat
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
import java.util.function.Predicate
-object JavaConversions {
+object JavaConversions extends SparkAdapterSupport {
def getPredicate[T](function1: (T) => Boolean): Predicate[T] = {
new Predicate[T] {
override def test(t: T): Boolean = function1.apply(t)
@@ -34,4 +39,19 @@ object JavaConversions {
}
}
}
+
+ /**
+ * Read parquet files using [[TestSparkParquetReaderFormat]]
+ *
+ * @param sparkSession the spark session
+ * @param paths comma seperated list of parquet files or directories
containing parquet files
+ * @return dataframe containing the data from the input paths
+ */
+ def createTestDataFrame(sparkSession: SparkSession, paths: String):
DataFrame = {
+ sparkSession.sqlContext.baseRelationToDataFrame(DataSource.apply(
+ sparkSession = sparkSession,
+ className =
"org.apache.spark.sql.execution.datasources.parquet.TestSparkParquetReaderFormat",
+ paths = paths.split(",").toSeq
+ ).resolveRelation())
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index 932e3dd05f0..7b5f5847562 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -19,6 +19,7 @@
package org.apache.spark.sql.adapter
import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -31,10 +32,11 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable}
import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark24LegacyHoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark24LegacyHoodieParquetFileFormat, Spark24ParquetReader, SparkParquetReader}
import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.parser.HoodieExtendedParserInterface
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder,
StructType}
@@ -205,4 +207,20 @@ class Spark2Adapter extends SparkAdapter {
batch.setNumRows(numRows)
batch
}
+
+ /**
+ * Get parquet file reader
+ *
+ * @param vectorized true if vectorized reading is not prohibited due to
schema, reading mode, etc
+ * @param sqlConf the [[SQLConf]] used for the read
+ * @param options passed as a param to the file format
+ * @param hadoopConf some configs will be set for the hadoopConf
+ * @return parquet file reader
+ */
+ override def createParquetFileReader(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration):
SparkParquetReader = {
+ Spark24ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala
new file mode 100644
index 00000000000..7fa30a36222
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat,
ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.{PartitionedFile,
RecordReaderIterator}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+import java.net.URI
+
+class Spark24ParquetReader(enableVectorizedReader: Boolean,
+ enableParquetFilterPushDown: Boolean,
+ pushDownDate: Boolean,
+ pushDownTimestamp: Boolean,
+ pushDownDecimal: Boolean,
+ pushDownInFilterThreshold: Int,
+ pushDownStringStartWith: Boolean,
+ isCaseSensitive: Boolean,
+ timestampConversion: Boolean,
+ enableOffHeapColumnVector: Boolean,
+ capacity: Int,
+ returningBatch: Boolean,
+ enableRecordFilter: Boolean,
+ timeZoneId: Option[String]) extends
SparkParquetReaderBase(
+ enableVectorizedReader = enableVectorizedReader,
+ enableParquetFilterPushDown = enableParquetFilterPushDown,
+ pushDownDate = pushDownDate,
+ pushDownTimestamp = pushDownTimestamp,
+ pushDownDecimal = pushDownDecimal,
+ pushDownInFilterThreshold = pushDownInFilterThreshold,
+ isCaseSensitive = isCaseSensitive,
+ timestampConversion = timestampConversion,
+ enableOffHeapColumnVector = enableOffHeapColumnVector,
+ capacity = capacity,
+ returningBatch = returningBatch,
+ enableRecordFilter = enableRecordFilter,
+ timeZoneId = timeZoneId) {
+
+ /**
+ * Read an individual parquet file
+ * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark
v2.4.8 adapted here
+ *
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param sharedConf the hadoop conf
+ * @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
+ */
+ protected def doRead(file: PartitionedFile,
+ requiredSchema: StructType,
+ partitionSchema: StructType,
+ filters: Seq[Filter],
+ sharedConf: Configuration): Iterator[InternalRow] = {
+ assert(file.partitionValues.numFields == partitionSchema.size)
+
+ val fileSplit =
+ new FileSplit(new Path(new URI(file.filePath)), file.start, file.length,
Array.empty)
+ val filePath = fileSplit.getPath
+
+ val split =
+ new org.apache.parquet.hadoop.ParquetInputSplit(
+ filePath,
+ fileSplit.getStart,
+ fileSplit.getStart + fileSplit.getLength,
+ fileSplit.getLength,
+ fileSplit.getLocations,
+ null)
+
+ lazy val footerFileMetaData =
+ ParquetFileReader.readFooter(sharedConf, filePath,
SKIP_ROW_GROUPS).getFileMetaData
+ // Try to push down filters when filter push-down is enabled.
+ val pushed = if (enableParquetFilterPushDown) {
+ val parquetSchema = footerFileMetaData.getSchema
+ val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp,
pushDownDecimal,
+ pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not
all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
+ // is used here.
+ .flatMap(parquetFilters.createFilter(parquetSchema, _))
+ .reduceOption(FilterApi.and)
+ } else {
+ None
+ }
+
+ // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions
to int96 timestamps'
+ // *only* if the file was created by something other than "parquet-mr", so
check the actual
+ // writer here for this file. We have to do this per-file, as each file
in the table may
+ // have different writers.
+ // Define isCreatedByParquetMr as function to avoid unnecessary parquet
footer reads.
+ def isCreatedByParquetMr: Boolean =
+ footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+ val convertTz =
+ if (timestampConversion && !isCreatedByParquetMr) {
+
Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+ } else {
+ None
+ }
+
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
+ val hadoopAttemptContext =
+ new TaskAttemptContextImpl(sharedConf, attemptId)
+
+ // Try to push down filters when filter push-down is enabled.
+ // Notice: This push-down is RowGroups level, not individual records.
+ if (pushed.isDefined) {
+
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration,
pushed.get)
+ }
+ val taskContext = Option(TaskContext.get())
+ if (enableVectorizedReader) {
+ val vectorizedReader = new VectorizedParquetRecordReader(
+ convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined,
capacity)
+ val iter = new RecordReaderIterator(vectorizedReader)
+ // SPARK-23457 Register a task completion lister before `initialization`.
+ taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+ vectorizedReader.initialize(split, hadoopAttemptContext)
+ vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+ if (returningBatch) {
+ vectorizedReader.enableReturningBatches()
+ }
+
+ // UnsafeRowParquetRecordReader appends the columns internally to avoid
another copy.
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } else {
+ // ParquetRecordReader returns UnsafeRow
+ val reader = if (pushed.isDefined && enableRecordFilter) {
+ val parquetFilter = FilterCompat.get(pushed.get, null)
+ new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz),
parquetFilter)
+ } else {
+ new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
+ }
+ val iter = new RecordReaderIterator(reader)
+ // SPARK-23457 Register a task completion lister before `initialization`.
+ taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+ reader.initialize(split, hadoopAttemptContext)
+
+ val fullSchema = requiredSchema.toAttributes ++
partitionSchema.toAttributes
+ val joinedRow = new JoinedRow()
+ val appendPartitionColumns =
GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+
+ // This is a horrible erasure hack... if we type the iterator above,
then it actually check
+ // the type in next() and we get a class cast exception. If we make
that function return
+ // Object, then we can defer the cast until later!
+ if (partitionSchema.length == 0) {
+ // There is no partition columns
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } else {
+ iter.asInstanceOf[Iterator[InternalRow]]
+ .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
+ }
+ }
+ }
+}
+
+object Spark24ParquetReader extends SparkParquetReaderBuilder {
+ /**
+ * Get parquet file reader
+ *
+ * @param vectorized true if vectorized reading is not prohibited due to
schema, reading mode, etc
+ * @param sqlConf the [[SQLConf]] used for the read
+ * @param options passed as a param to the file format
+ * @param hadoopConf some configs will be set for the hadoopConf
+ * @return parquet file reader
+ */
+ def build(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration): SparkParquetReader = {
+ //set hadoopconf
+ hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
+ hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key,
sqlConf.sessionLocalTimeZone)
+ hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key,
sqlConf.caseSensitiveAnalysis)
+ hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key,
sqlConf.isParquetBinaryAsString)
+ hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sqlConf.isParquetINT96AsTimestamp)
+
+ new Spark24ParquetReader(
+ enableVectorizedReader = vectorized,
+ enableParquetFilterPushDown = sqlConf.parquetFilterPushDown,
+ pushDownDate = sqlConf.parquetFilterPushDownDate,
+ pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp,
+ pushDownDecimal = sqlConf.parquetFilterPushDownDecimal,
+ pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold,
+ pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith,
+ isCaseSensitive = sqlConf.caseSensitiveAnalysis,
+ timestampConversion = sqlConf.isParquetINT96TimestampConversion,
+ enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled,
+ capacity = sqlConf.parquetVectorizedReaderBatchSize,
+ returningBatch = sqlConf.parquetVectorizedReaderEnabled,
+ enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
+ timeZoneId = Some(sqlConf.sessionLocalTimeZone))
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala
index 22a9f090fb3..8fbcf5a060b 100644
---
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala
@@ -19,6 +19,7 @@
package org.apache.spark.sql.adapter
import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
import org.apache.hudi.Spark30HoodieFileScanRDD
import org.apache.spark.sql._
import org.apache.spark.sql.avro.{HoodieAvroDeserializer,
HoodieAvroSerializer, HoodieSpark3_0AvroDeserializer,
HoodieSpark3_0AvroSerializer}
@@ -29,10 +30,11 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark30LegacyHoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark30LegacyHoodieParquetFileFormat, Spark30ParquetReader, SparkParquetReader}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD,
HoodieSpark30PartitionedFileUtils, HoodieSparkPartitionedFileUtils,
PartitionedFile}
import org.apache.spark.sql.hudi.SparkAdapter
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.parser.{HoodieExtendedParserInterface,
HoodieSpark3_0ExtendedSqlParser}
import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder,
StructType}
import org.apache.spark.sql.vectorized.ColumnarUtils
@@ -118,4 +120,20 @@ class Spark3_0Adapter extends BaseSpark3Adapter {
case OFF_HEAP => "OFF_HEAP"
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel:
$level")
}
+
+ /**
+ * Get parquet file reader
+ *
+ * @param vectorized true if vectorized reading is not prohibited due to
schema, reading mode, etc
+ * @param sqlConf the [[SQLConf]] used for the read
+ * @param options passed as a param to the file format
+ * @param hadoopConf some configs will be set for the hadoopConf
+ * @return parquet file reader
+ */
+ override def createParquetFileReader(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration):
SparkParquetReader = {
+ Spark30ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala
new file mode 100644
index 00000000000..9088676c335
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop._
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+
+import java.net.URI
+
+class Spark30ParquetReader(enableVectorizedReader: Boolean,
+ enableParquetFilterPushDown: Boolean,
+ pushDownDate: Boolean,
+ pushDownTimestamp: Boolean,
+ pushDownDecimal: Boolean,
+ pushDownInFilterThreshold: Int,
+ pushDownStringStartWith: Boolean,
+ isCaseSensitive: Boolean,
+ timestampConversion: Boolean,
+ enableOffHeapColumnVector: Boolean,
+ capacity: Int,
+ returningBatch: Boolean,
+ enableRecordFilter: Boolean,
+ timeZoneId: Option[String]) extends
SparkParquetReaderBase(
+ enableVectorizedReader = enableVectorizedReader,
+ enableParquetFilterPushDown = enableParquetFilterPushDown,
+ pushDownDate = pushDownDate,
+ pushDownTimestamp = pushDownTimestamp,
+ pushDownDecimal = pushDownDecimal,
+ pushDownInFilterThreshold = pushDownInFilterThreshold,
+ isCaseSensitive = isCaseSensitive,
+ timestampConversion = timestampConversion,
+ enableOffHeapColumnVector = enableOffHeapColumnVector,
+ capacity = capacity,
+ returningBatch = returningBatch,
+ enableRecordFilter = enableRecordFilter,
+ timeZoneId = timeZoneId) {
+
+ /**
+ * Read an individual parquet file
+ * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark
v3.0.3 adapted here
+ *
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param sharedConf the hadoop conf
+ * @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
+ */
+ protected def doRead(file: PartitionedFile,
+ requiredSchema: StructType,
+ partitionSchema: StructType,
+ filters: Seq[Filter],
+ sharedConf: Configuration): Iterator[InternalRow] = {
+ assert(file.partitionValues.numFields == partitionSchema.size)
+
+ val filePath = new Path(new URI(file.filePath))
+ val split =
+ new org.apache.parquet.hadoop.ParquetInputSplit(
+ filePath,
+ file.start,
+ file.start + file.length,
+ file.length,
+ Array.empty,
+ null)
+
+ lazy val footerFileMetaData =
+ ParquetFileReader.readFooter(sharedConf, filePath,
SKIP_ROW_GROUPS).getFileMetaData
+ // Try to push down filters when filter push-down is enabled.
+ val pushed = if (enableParquetFilterPushDown) {
+ val parquetSchema = footerFileMetaData.getSchema
+ val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate,
pushDownTimestamp,
+ pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold,
isCaseSensitive)
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not
all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
+ // is used here.
+ .flatMap(parquetFilters.createFilter(_))
+ .reduceOption(FilterApi.and)
+ } else {
+ None
+ }
+
+ // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions
to int96 timestamps'
+ // *only* if the file was created by something other than "parquet-mr", so
check the actual
+ // writer here for this file. We have to do this per-file, as each file
in the table may
+ // have different writers.
+ // Define isCreatedByParquetMr as function to avoid unnecessary parquet
footer reads.
+ def isCreatedByParquetMr: Boolean =
+ footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+ val convertTz =
+ if (timestampConversion && !isCreatedByParquetMr) {
+
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+ } else {
+ None
+ }
+
+ val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
+
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
+ val hadoopAttemptContext =
+ new TaskAttemptContextImpl(sharedConf, attemptId)
+
+ // Try to push down filters when filter push-down is enabled.
+ // Notice: This push-down is RowGroups level, not individual records.
+ if (pushed.isDefined) {
+
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration,
pushed.get)
+ }
+ val taskContext = Option(TaskContext.get())
+ if (enableVectorizedReader) {
+ val vectorizedReader = new VectorizedParquetRecordReader(
+ convertTz.orNull,
+ datetimeRebaseMode.toString,
+ enableOffHeapColumnVector && taskContext.isDefined,
+ capacity)
+ val iter = new RecordReaderIterator(vectorizedReader)
+ // SPARK-23457 Register a task completion listener before
`initialization`.
+ taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+ vectorizedReader.initialize(split, hadoopAttemptContext)
+ vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+ if (returningBatch) {
+ vectorizedReader.enableReturningBatches()
+ }
+
+ // UnsafeRowParquetRecordReader appends the columns internally to avoid
another copy.
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } else {
+ // ParquetRecordReader returns InternalRow
+ val readSupport = new ParquetReadSupport(
+ convertTz, enableVectorizedReader = false, datetimeRebaseMode)
+ val reader = if (pushed.isDefined && enableRecordFilter) {
+ val parquetFilter = FilterCompat.get(pushed.get, null)
+ new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
+ } else {
+ new ParquetRecordReader[InternalRow](readSupport)
+ }
+ val iter = new RecordReaderIterator[InternalRow](reader)
+ // SPARK-23457 Register a task completion listener before
`initialization`.
+ taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+ reader.initialize(split, hadoopAttemptContext)
+
+ val fullSchema = requiredSchema.toAttributes ++
partitionSchema.toAttributes
+ val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema,
fullSchema)
+
+ if (partitionSchema.length == 0) {
+ // There is no partition columns
+ iter.map(unsafeProjection)
+ } else {
+ val joinedRow = new JoinedRow()
+ iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+ }
+ }
+ }
+
+}
+
+object Spark30ParquetReader extends SparkParquetReaderBuilder {
+ /**
+ * Get parquet file reader
+ *
+ * @param vectorized true if vectorized reading is not prohibited due to
schema, reading mode, etc
+ * @param sqlConf the [[SQLConf]] used for the read
+ * @param options passed as a param to the file format
+ * @param hadoopConf some configs will be set for the hadoopConf
+ * @return parquet file reader
+ */
+ def build(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration): SparkParquetReader = {
+ //set hadoopconf
+ hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
+ hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key,
sqlConf.sessionLocalTimeZone)
+ hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
sqlConf.nestedSchemaPruningEnabled)
+ hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key,
sqlConf.caseSensitiveAnalysis)
+ hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key,
sqlConf.isParquetBinaryAsString)
+ hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sqlConf.isParquetINT96AsTimestamp)
+
+ new Spark30ParquetReader(
+ enableVectorizedReader = vectorized,
+ enableParquetFilterPushDown = sqlConf.parquetFilterPushDown,
+ pushDownDate = sqlConf.parquetFilterPushDownDate,
+ pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp,
+ pushDownDecimal = sqlConf.parquetFilterPushDownDecimal,
+ pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold,
+ pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith,
+ isCaseSensitive = sqlConf.caseSensitiveAnalysis,
+ timestampConversion = sqlConf.isParquetINT96TimestampConversion,
+ enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled,
+ capacity = sqlConf.parquetVectorizedReaderBatchSize,
+ returningBatch = sqlConf.parquetVectorizedReaderEnabled,
+ enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
+ timeZoneId = Some(sqlConf.sessionLocalTimeZone))
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
index 8ca072333d0..21f897afe1c 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
@@ -19,6 +19,7 @@
package org.apache.spark.sql.adapter
import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
import org.apache.hudi.Spark31HoodieFileScanRDD
import org.apache.spark.sql._
import org.apache.spark.sql.avro.{HoodieAvroDeserializer,
HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer,
HoodieSpark3_1AvroSerializer}
@@ -30,10 +31,11 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark31LegacyHoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark31LegacyHoodieParquetFileFormat, Spark31ParquetReader, SparkParquetReader}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD,
HoodieSpark31PartitionedFileUtils, HoodieSparkPartitionedFileUtils,
PartitionedFile}
import org.apache.spark.sql.hudi.SparkAdapter
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.parser.{HoodieExtendedParserInterface,
HoodieSpark3_1ExtendedSqlParser}
import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder,
StructType}
import org.apache.spark.sql.vectorized.ColumnarUtils
@@ -121,4 +123,19 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel:
$level")
}
+ /**
+ * Get parquet file reader
+ *
+ * @param vectorized true if vectorized reading is not prohibited due to
schema, reading mode, etc
+ * @param sqlConf the [[SQLConf]] used for the read
+ * @param options passed as a param to the file format
+ * @param hadoopConf some configs will be set for the hadoopConf
+ * @return parquet file reader
+ */
+ override def createParquetFileReader(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration):
SparkParquetReader = {
+ Spark31ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetReader.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetReader.scala
new file mode 100644
index 00000000000..94a5efaee61
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetReader.scala
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop._
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+
+import java.net.URI
+
+class Spark31ParquetReader(enableVectorizedReader: Boolean,
+ enableParquetFilterPushDown: Boolean,
+ pushDownDate: Boolean,
+ pushDownTimestamp: Boolean,
+ pushDownDecimal: Boolean,
+ pushDownInFilterThreshold: Int,
+ pushDownStringStartWith: Boolean,
+ isCaseSensitive: Boolean,
+ timestampConversion: Boolean,
+ enableOffHeapColumnVector: Boolean,
+ capacity: Int,
+ returningBatch: Boolean,
+ enableRecordFilter: Boolean,
+ timeZoneId: Option[String]) extends
SparkParquetReaderBase(
+ enableVectorizedReader = enableVectorizedReader,
+ enableParquetFilterPushDown = enableParquetFilterPushDown,
+ pushDownDate = pushDownDate,
+ pushDownTimestamp = pushDownTimestamp,
+ pushDownDecimal = pushDownDecimal,
+ pushDownInFilterThreshold = pushDownInFilterThreshold,
+ isCaseSensitive = isCaseSensitive,
+ timestampConversion = timestampConversion,
+ enableOffHeapColumnVector = enableOffHeapColumnVector,
+ capacity = capacity,
+ returningBatch = returningBatch,
+ enableRecordFilter = enableRecordFilter,
+ timeZoneId = timeZoneId) {
+
+ /**
+ * Read an individual parquet file
+ * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark
v3.1.3 adapted here
+ *
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param sharedConf the hadoop conf
+ * @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
+ */
+ protected def doRead(file: PartitionedFile,
+ requiredSchema: StructType,
+ partitionSchema: StructType,
+ filters: Seq[Filter],
+ sharedConf: Configuration): Iterator[InternalRow] = {
+ assert(file.partitionValues.numFields == partitionSchema.size)
+
+ val filePath = new Path(new URI(file.filePath))
+ val split =
+ new org.apache.parquet.hadoop.ParquetInputSplit(
+ filePath,
+ file.start,
+ file.start + file.length,
+ file.length,
+ Array.empty,
+ null)
+
+ lazy val footerFileMetaData =
+ ParquetFileReader.readFooter(sharedConf, filePath,
SKIP_ROW_GROUPS).getFileMetaData
+ val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
+ // Try to push down filters when filter push-down is enabled.
+ val pushed = if (enableParquetFilterPushDown) {
+ val parquetSchema = footerFileMetaData.getSchema
+ val parquetFilters = new ParquetFilters(
+ parquetSchema,
+ pushDownDate,
+ pushDownTimestamp,
+ pushDownDecimal,
+ pushDownStringStartWith,
+ pushDownInFilterThreshold,
+ isCaseSensitive,
+ datetimeRebaseMode)
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not
all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
+ // is used here.
+ .flatMap(parquetFilters.createFilter(_))
+ .reduceOption(FilterApi.and)
+ } else {
+ None
+ }
+
+ // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions
to int96 timestamps'
+ // *only* if the file was created by something other than "parquet-mr", so
check the actual
+ // writer here for this file. We have to do this per-file, as each file
in the table may
+ // have different writers.
+ // Define isCreatedByParquetMr as function to avoid unnecessary parquet
footer reads.
+ def isCreatedByParquetMr: Boolean =
+ footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+ val convertTz =
+ if (timestampConversion && !isCreatedByParquetMr) {
+
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+ } else {
+ None
+ }
+
+ val int96RebaseMode = DataSourceUtils.int96RebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
+
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
+ val hadoopAttemptContext =
+ new TaskAttemptContextImpl(sharedConf, attemptId)
+
+ // Try to push down filters when filter push-down is enabled.
+ // Notice: This push-down is RowGroups level, not individual records.
+ if (pushed.isDefined) {
+
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration,
pushed.get)
+ }
+ val taskContext = Option(TaskContext.get())
+ if (enableVectorizedReader) {
+ val vectorizedReader = new VectorizedParquetRecordReader(
+ convertTz.orNull,
+ datetimeRebaseMode.toString,
+ int96RebaseMode.toString,
+ enableOffHeapColumnVector && taskContext.isDefined,
+ capacity)
+ val iter = new RecordReaderIterator(vectorizedReader)
+ // SPARK-23457 Register a task completion listener before
`initialization`.
+ taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+ vectorizedReader.initialize(split, hadoopAttemptContext)
+ vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+ if (returningBatch) {
+ vectorizedReader.enableReturningBatches()
+ }
+
+ // UnsafeRowParquetRecordReader appends the columns internally to avoid
another copy.
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } else {
+ // ParquetRecordReader returns InternalRow
+ val readSupport = new ParquetReadSupport(
+ convertTz,
+ enableVectorizedReader = false,
+ datetimeRebaseMode,
+ int96RebaseMode)
+ val reader = if (pushed.isDefined && enableRecordFilter) {
+ val parquetFilter = FilterCompat.get(pushed.get, null)
+ new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
+ } else {
+ new ParquetRecordReader[InternalRow](readSupport)
+ }
+ val iter = new RecordReaderIterator[InternalRow](reader)
+ // SPARK-23457 Register a task completion listener before
`initialization`.
+ taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+ reader.initialize(split, hadoopAttemptContext)
+
+ val fullSchema = requiredSchema.toAttributes ++
partitionSchema.toAttributes
+ val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema,
fullSchema)
+
+ if (partitionSchema.length == 0) {
+ // There is no partition columns
+ iter.map(unsafeProjection)
+ } else {
+ val joinedRow = new JoinedRow()
+ iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+ }
+ }
+ }
+}
+
+object Spark31ParquetReader extends SparkParquetReaderBuilder {
+
+ /**
+ * Get parquet file reader
+ *
+ * @param vectorized true if vectorized reading is not prohibited due to
schema, reading mode, etc
+ * @param sqlConf the [[SQLConf]] used for the read
+ * @param options passed as a param to the file format
+ * @param hadoopConf some configs will be set for the hadoopConf
+ * @return parquet file reader
+ */
+ def build(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration): SparkParquetReader = {
+ //set hadoopconf
+ hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
+ hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key,
sqlConf.sessionLocalTimeZone)
+ hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
sqlConf.nestedSchemaPruningEnabled)
+ hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key,
sqlConf.caseSensitiveAnalysis)
+ hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key,
sqlConf.isParquetBinaryAsString)
+ hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sqlConf.isParquetINT96AsTimestamp)
+ new Spark31ParquetReader(
+ enableVectorizedReader = vectorized,
+ enableParquetFilterPushDown = sqlConf.parquetFilterPushDown,
+ pushDownDate = sqlConf.parquetFilterPushDownDate,
+ pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp,
+ pushDownDecimal = sqlConf.parquetFilterPushDownDecimal,
+ pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold,
+ pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith,
+ isCaseSensitive = sqlConf.caseSensitiveAnalysis,
+ timestampConversion = sqlConf.isParquetINT96TimestampConversion,
+ enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled,
+ capacity = sqlConf.parquetVectorizedReaderBatchSize,
+ returningBatch = sqlConf.parquetVectorizedReaderEnabled,
+ enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
+ timeZoneId = Some(sqlConf.sessionLocalTimeZone))
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
index 3a5812a5faa..ea486c7383b 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.adapter
import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
import org.apache.hudi.Spark32HoodieFileScanRDD
import org.apache.spark.sql._
import org.apache.spark.sql.avro._
@@ -30,10 +31,11 @@ import
org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable,
LogicalPlan}
import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark32LegacyHoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark32LegacyHoodieParquetFileFormat, Spark32ParquetReader, SparkParquetReader}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hudi.analysis.TableValuedFunctions
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.parser.{HoodieExtendedParserInterface,
HoodieSpark3_2ExtendedSqlParser}
import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder,
StructType}
import org.apache.spark.sql.vectorized.ColumnarUtils
@@ -123,4 +125,20 @@ class Spark3_2Adapter extends BaseSpark3Adapter {
case OFF_HEAP => "OFF_HEAP"
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel:
$level")
}
+
+ /**
+ * Get parquet file reader
+ *
+ * @param vectorized true if vectorized reading is not prohibited due to
schema, reading mode, etc
+ * @param sqlConf the [[SQLConf]] used for the read
+ * @param options passed as a param to the file format
+ * @param hadoopConf some configs will be set for the hadoopConf
+ * @return parquet file reader
+ */
+ override def createParquetFileReader(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration):
SparkParquetReader = {
+ Spark32ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32ParquetReader.scala
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32ParquetReader.scala
new file mode 100644
index 00000000000..5437a18cd4b
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32ParquetReader.scala
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.FileSplit
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop._
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+
+import java.net.URI
+
+class Spark32ParquetReader(enableVectorizedReader: Boolean,
+ datetimeRebaseModeInRead: String,
+ int96RebaseModeInRead: String,
+ enableParquetFilterPushDown: Boolean,
+ pushDownDate: Boolean,
+ pushDownTimestamp: Boolean,
+ pushDownDecimal: Boolean,
+ pushDownInFilterThreshold: Int,
+ pushDownStringStartWith: Boolean,
+ isCaseSensitive: Boolean,
+ timestampConversion: Boolean,
+ enableOffHeapColumnVector: Boolean,
+ capacity: Int,
+ returningBatch: Boolean,
+ enableRecordFilter: Boolean,
+ timeZoneId: Option[String]) extends
SparkParquetReaderBase(
+ enableVectorizedReader = enableVectorizedReader,
+ enableParquetFilterPushDown = enableParquetFilterPushDown,
+ pushDownDate = pushDownDate,
+ pushDownTimestamp = pushDownTimestamp,
+ pushDownDecimal = pushDownDecimal,
+ pushDownInFilterThreshold = pushDownInFilterThreshold,
+ isCaseSensitive = isCaseSensitive,
+ timestampConversion = timestampConversion,
+ enableOffHeapColumnVector = enableOffHeapColumnVector,
+ capacity = capacity,
+ returningBatch = returningBatch,
+ enableRecordFilter = enableRecordFilter,
+ timeZoneId = timeZoneId) {
+
+ /**
+ * Read an individual parquet file
+ * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark
v3.2.4 adapted here
+ *
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param sharedConf the hadoop conf
+ * @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
+ */
+ protected def doRead(file: PartitionedFile,
+ requiredSchema: StructType,
+ partitionSchema: StructType,
+ filters: Seq[Filter],
+ sharedConf: Configuration): Iterator[InternalRow] = {
+ assert(file.partitionValues.numFields == partitionSchema.size)
+
+ val filePath = new Path(new URI(file.filePath))
+ val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
+
+ lazy val footerFileMetaData =
+ ParquetFooterReader.readFooter(sharedConf, filePath,
SKIP_ROW_GROUPS).getFileMetaData
+ val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
+ footerFileMetaData.getKeyValueMetaData.get,
+ datetimeRebaseModeInRead)
+ // Try to push down filters when filter push-down is enabled.
+ val pushed = if (enableParquetFilterPushDown) {
+ val parquetSchema = footerFileMetaData.getSchema
+ val parquetFilters = new ParquetFilters(
+ parquetSchema,
+ pushDownDate,
+ pushDownTimestamp,
+ pushDownDecimal,
+ pushDownStringStartWith,
+ pushDownInFilterThreshold,
+ isCaseSensitive,
+ datetimeRebaseSpec)
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not
all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
+ // is used here.
+ .flatMap(parquetFilters.createFilter(_))
+ .reduceOption(FilterApi.and)
+ } else {
+ None
+ }
+
+ // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions
to int96 timestamps'
+ // *only* if the file was created by something other than "parquet-mr", so
check the actual
+ // writer here for this file. We have to do this per-file, as each file
in the table may
+ // have different writers.
+ // Define isCreatedByParquetMr as function to avoid unnecessary parquet
footer reads.
+ def isCreatedByParquetMr: Boolean =
+ footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+ val convertTz =
+ if (timestampConversion && !isCreatedByParquetMr) {
+
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+ } else {
+ None
+ }
+
+ val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
+ footerFileMetaData.getKeyValueMetaData.get,
+ int96RebaseModeInRead)
+
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
+ val hadoopAttemptContext =
+ new TaskAttemptContextImpl(sharedConf, attemptId)
+
+ // Try to push down filters when filter push-down is enabled.
+ // Notice: This push-down is RowGroups level, not individual records.
+ if (pushed.isDefined) {
+
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration,
pushed.get)
+ }
+ val taskContext = Option(TaskContext.get())
+ if (enableVectorizedReader) {
+ val vectorizedReader = new VectorizedParquetRecordReader(
+ convertTz.orNull,
+ datetimeRebaseSpec.mode.toString,
+ datetimeRebaseSpec.timeZone,
+ int96RebaseSpec.mode.toString,
+ int96RebaseSpec.timeZone,
+ enableOffHeapColumnVector && taskContext.isDefined,
+ capacity)
+ // SPARK-37089: We cannot register a task completion listener to close
this iterator here
+ // because downstream exec nodes have already registered their
listeners. Since listeners
+ // are executed in reverse order of registration, a listener registered
here would close the
+ // iterator while downstream exec nodes are still running. When off-heap
column vectors are
+ // enabled, this can cause a use-after-free bug leading to a segfault.
+ //
+ // Instead, we use FileScanRDD's task completion listener to close this
iterator.
+ val iter = new RecordReaderIterator(vectorizedReader)
+ try {
+ vectorizedReader.initialize(split, hadoopAttemptContext)
+ vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+ if (returningBatch) {
+ vectorizedReader.enableReturningBatches()
+ }
+
+ // UnsafeRowParquetRecordReader appends the columns internally to
avoid another copy.
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } catch {
+ case e: Throwable =>
+ // SPARK-23457: In case there is an exception in initialization,
close the iterator to
+ // avoid leaking resources.
+ iter.close()
+ throw e
+ }
+ } else {
+ // ParquetRecordReader returns InternalRow
+ val readSupport = new ParquetReadSupport(
+ convertTz,
+ enableVectorizedReader = false,
+ datetimeRebaseSpec,
+ int96RebaseSpec)
+ val reader = if (pushed.isDefined && enableRecordFilter) {
+ val parquetFilter = FilterCompat.get(pushed.get, null)
+ new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
+ } else {
+ new ParquetRecordReader[InternalRow](readSupport)
+ }
+ val iter = new RecordReaderIterator[InternalRow](reader)
+ try {
+ reader.initialize(split, hadoopAttemptContext)
+
+ val fullSchema = requiredSchema.toAttributes ++
partitionSchema.toAttributes
+ val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema,
fullSchema)
+
+ if (partitionSchema.length == 0) {
+ // There is no partition columns
+ iter.map(unsafeProjection)
+ } else {
+ val joinedRow = new JoinedRow()
+ iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+ }
+ } catch {
+ case e: Throwable =>
+ // SPARK-23457: In case there is an exception in initialization,
close the iterator to
+ // avoid leaking resources.
+ iter.close()
+ throw e
+ }
+ }
+ }
+}
+
+object Spark32ParquetReader extends SparkParquetReaderBuilder {
+ /**
+ * Get parquet file reader
+ *
+ * @param vectorized true if vectorized reading is not prohibited due to
schema, reading mode, etc
+ * @param sqlConf the [[SQLConf]] used for the read
+ * @param options passed as a param to the file format
+ * @param hadoopConf some configs will be set for the hadoopConf
+ * @return parquet file reader
+ */
+ def build(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration): SparkParquetReader = {
+ //set hadoopconf
+ hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
+ hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key,
sqlConf.sessionLocalTimeZone)
+ hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
sqlConf.nestedSchemaPruningEnabled)
+ hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key,
sqlConf.caseSensitiveAnalysis)
+ hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key,
sqlConf.isParquetBinaryAsString)
+ hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sqlConf.isParquetINT96AsTimestamp)
+ // Using string value of this conf to preserve compatibility across spark
versions. See [HUDI-5868]
+ hadoopConf.setBoolean(
+ "spark.sql.legacy.parquet.nanosAsLong",
+ sqlConf.getConfString("spark.sql.legacy.parquet.nanosAsLong",
"false").toBoolean
+ )
+
+ val parquetOptions = new ParquetOptions(options, sqlConf)
+ new Spark32ParquetReader(
+ enableVectorizedReader = vectorized,
+ datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead,
+ int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead,
+ enableParquetFilterPushDown = sqlConf.parquetFilterPushDown,
+ pushDownDate = sqlConf.parquetFilterPushDownDate,
+ pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp,
+ pushDownDecimal = sqlConf.parquetFilterPushDownDecimal,
+ pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold,
+ pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith,
+ isCaseSensitive = sqlConf.caseSensitiveAnalysis,
+ timestampConversion = sqlConf.isParquetINT96TimestampConversion,
+ enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled,
+ capacity = sqlConf.parquetVectorizedReaderBatchSize,
+ returningBatch = sqlConf.parquetVectorizedReaderEnabled,
+ enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
+ timeZoneId = Some(sqlConf.sessionLocalTimeZone))
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
index e3d2cc9cd18..c11c404c33a 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.adapter
import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
import org.apache.hudi.Spark33HoodieFileScanRDD
import org.apache.spark.sql._
import org.apache.spark.sql.avro._
@@ -30,10 +31,11 @@ import
org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark33LegacyHoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark33LegacyHoodieParquetFileFormat, Spark33ParquetReader, SparkParquetReader}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hudi.analysis.TableValuedFunctions
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.parser.{HoodieExtendedParserInterface,
HoodieSpark3_3ExtendedSqlParser}
import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder,
StructType}
import org.apache.spark.sql.vectorized.ColumnarBatchRow
@@ -124,4 +126,20 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
case OFF_HEAP => "OFF_HEAP"
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel:
$level")
}
+
+ /**
+ * Get parquet file reader
+ *
+ * @param vectorized true if vectorized reading is not prohibited due to
schema, reading mode, etc
+ * @param sqlConf the [[SQLConf]] used for the read
+ * @param options passed as a param to the file format
+ * @param hadoopConf some configs will be set for the hadoopConf
+ * @return parquet file reader
+ */
+ override def createParquetFileReader(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration):
SparkParquetReader = {
+ Spark33ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
new file mode 100644
index 00000000000..0bd8cca3599
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.FileSplit
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop._
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+
+import java.net.URI
+
+class Spark33ParquetReader(enableVectorizedReader: Boolean,
+ datetimeRebaseModeInRead: String,
+ int96RebaseModeInRead: String,
+ enableParquetFilterPushDown: Boolean,
+ pushDownDate: Boolean,
+ pushDownTimestamp: Boolean,
+ pushDownDecimal: Boolean,
+ pushDownInFilterThreshold: Int,
+ pushDownStringStartWith: Boolean,
+ isCaseSensitive: Boolean,
+ timestampConversion: Boolean,
+ enableOffHeapColumnVector: Boolean,
+ capacity: Int,
+ returningBatch: Boolean,
+ enableRecordFilter: Boolean,
+ timeZoneId: Option[String]) extends
SparkParquetReaderBase(
+ enableVectorizedReader = enableVectorizedReader,
+ enableParquetFilterPushDown = enableParquetFilterPushDown,
+ pushDownDate = pushDownDate,
+ pushDownTimestamp = pushDownTimestamp,
+ pushDownDecimal = pushDownDecimal,
+ pushDownInFilterThreshold = pushDownInFilterThreshold,
+ isCaseSensitive = isCaseSensitive,
+ timestampConversion = timestampConversion,
+ enableOffHeapColumnVector = enableOffHeapColumnVector,
+ capacity = capacity,
+ returningBatch = returningBatch,
+ enableRecordFilter = enableRecordFilter,
+ timeZoneId = timeZoneId) {
+
+ /**
+ * Read an individual parquet file
+ * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark
v3.3.4 adapted here
+ *
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param sharedConf the hadoop conf
+ * @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
+ */
+ protected def doRead(file: PartitionedFile,
+ requiredSchema: StructType,
+ partitionSchema: StructType,
+ filters: Seq[Filter],
+ sharedConf: Configuration): Iterator[InternalRow] = {
+ assert(file.partitionValues.numFields == partitionSchema.size)
+
+ val filePath = new Path(new URI(file.filePath))
+ val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
+
+
+ lazy val footerFileMetaData =
+ ParquetFooterReader.readFooter(sharedConf, filePath,
SKIP_ROW_GROUPS).getFileMetaData
+ val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
+ footerFileMetaData.getKeyValueMetaData.get,
+ datetimeRebaseModeInRead)
+ // Try to push down filters when filter push-down is enabled.
+ val pushed = if (enableParquetFilterPushDown) {
+ val parquetSchema = footerFileMetaData.getSchema
+ val parquetFilters = new ParquetFilters(
+ parquetSchema,
+ pushDownDate,
+ pushDownTimestamp,
+ pushDownDecimal,
+ pushDownStringStartWith,
+ pushDownInFilterThreshold,
+ isCaseSensitive,
+ datetimeRebaseSpec)
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not
all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
+ // is used here.
+ .flatMap(parquetFilters.createFilter(_))
+ .reduceOption(FilterApi.and)
+ } else {
+ None
+ }
+
+ // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions
to int96 timestamps'
+ // *only* if the file was created by something other than "parquet-mr", so
check the actual
+ // writer here for this file. We have to do this per-file, as each file
in the table may
+ // have different writers.
+ // Define isCreatedByParquetMr as function to avoid unnecessary parquet
footer reads.
+ def isCreatedByParquetMr: Boolean =
+ footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+ val convertTz =
+ if (timestampConversion && !isCreatedByParquetMr) {
+
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+ } else {
+ None
+ }
+
+ val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
+ footerFileMetaData.getKeyValueMetaData.get,
+ int96RebaseModeInRead)
+
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
+ val hadoopAttemptContext =
+ new TaskAttemptContextImpl(sharedConf, attemptId)
+
+ // Try to push down filters when filter push-down is enabled.
+ // Notice: This push-down is RowGroups level, not individual records.
+ if (pushed.isDefined) {
+
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration,
pushed.get)
+ }
+ val taskContext = Option(TaskContext.get())
+ if (enableVectorizedReader) {
+ val vectorizedReader = new VectorizedParquetRecordReader(
+ convertTz.orNull,
+ datetimeRebaseSpec.mode.toString,
+ datetimeRebaseSpec.timeZone,
+ int96RebaseSpec.mode.toString,
+ int96RebaseSpec.timeZone,
+ enableOffHeapColumnVector && taskContext.isDefined,
+ capacity)
+ // SPARK-37089: We cannot register a task completion listener to close
this iterator here
+ // because downstream exec nodes have already registered their
listeners. Since listeners
+ // are executed in reverse order of registration, a listener registered
here would close the
+ // iterator while downstream exec nodes are still running. When off-heap
column vectors are
+ // enabled, this can cause a use-after-free bug leading to a segfault.
+ //
+ // Instead, we use FileScanRDD's task completion listener to close this
iterator.
+ val iter = new RecordReaderIterator(vectorizedReader)
+ try {
+ vectorizedReader.initialize(split, hadoopAttemptContext)
+ vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+ if (returningBatch) {
+ vectorizedReader.enableReturningBatches()
+ }
+
+ // UnsafeRowParquetRecordReader appends the columns internally to
avoid another copy.
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } catch {
+ case e: Throwable =>
+ // SPARK-23457: In case there is an exception in initialization,
close the iterator to
+ // avoid leaking resources.
+ iter.close()
+ throw e
+ }
+ } else {
+ // ParquetRecordReader returns InternalRow
+ val readSupport = new ParquetReadSupport(
+ convertTz,
+ enableVectorizedReader = false,
+ datetimeRebaseSpec,
+ int96RebaseSpec)
+ val reader = if (pushed.isDefined && enableRecordFilter) {
+ val parquetFilter = FilterCompat.get(pushed.get, null)
+ new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
+ } else {
+ new ParquetRecordReader[InternalRow](readSupport)
+ }
+ val iter = new RecordReaderIterator[InternalRow](reader)
+ try {
+ reader.initialize(split, hadoopAttemptContext)
+
+ val fullSchema = requiredSchema.toAttributes ++
partitionSchema.toAttributes
+ val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema,
fullSchema)
+
+ if (partitionSchema.length == 0) {
+ // There is no partition columns
+ iter.map(unsafeProjection)
+ } else {
+ val joinedRow = new JoinedRow()
+ iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+ }
+ } catch {
+ case e: Throwable =>
+ // SPARK-23457: In case there is an exception in initialization,
close the iterator to
+ // avoid leaking resources.
+ iter.close()
+ throw e
+ }
+ }
+ }
+}
+
+object Spark33ParquetReader extends SparkParquetReaderBuilder {
+ /**
+ * Get parquet file reader
+ *
+ * @param vectorized true if vectorized reading is not prohibited due to
schema, reading mode, etc
+ * @param sqlConf the [[SQLConf]] used for the read
+ * @param options passed as a param to the file format
+ * @param hadoopConf some configs will be set for the hadoopConf
+ * @return parquet file reader
+ */
+ def build(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration): SparkParquetReader = {
+ //set hadoopconf
+ hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
+ hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key,
sqlConf.sessionLocalTimeZone)
+ hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
sqlConf.nestedSchemaPruningEnabled)
+ hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key,
sqlConf.caseSensitiveAnalysis)
+ hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key,
sqlConf.isParquetBinaryAsString)
+ hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sqlConf.isParquetINT96AsTimestamp)
+ // Using string value of this conf to preserve compatibility across spark
versions. See [HUDI-5868]
+ hadoopConf.setBoolean(
+ "spark.sql.legacy.parquet.nanosAsLong",
+ sqlConf.getConfString("spark.sql.legacy.parquet.nanosAsLong",
"false").toBoolean
+ )
+
+ val parquetOptions = new ParquetOptions(options, sqlConf)
+ new Spark33ParquetReader(enableVectorizedReader = vectorized,
+ datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead,
+ int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead,
+ enableParquetFilterPushDown = sqlConf.parquetFilterPushDown,
+ pushDownDate = sqlConf.parquetFilterPushDownDate,
+ pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp,
+ pushDownDecimal = sqlConf.parquetFilterPushDownDecimal,
+ pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold,
+ pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith,
+ isCaseSensitive = sqlConf.caseSensitiveAnalysis,
+ timestampConversion = sqlConf.isParquetINT96TimestampConversion,
+ enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled,
+ capacity = sqlConf.parquetVectorizedReaderBatchSize,
+ returningBatch = sqlConf.parquetVectorizedReaderEnabled,
+ enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
+ timeZoneId = Some(sqlConf.sessionLocalTimeZone))
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
index 0ae5ef3dbf3..1e2807df55b 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.adapter
import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
import org.apache.hudi.Spark34HoodieFileScanRDD
import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
@@ -29,7 +30,7 @@ import
org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark34LegacyHoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark34LegacyHoodieParquetFileFormat, Spark34ParquetReader, SparkParquetReader}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hudi.analysis.TableValuedFunctions
@@ -37,6 +38,7 @@ import
org.apache.spark.sql.parser.{HoodieExtendedParserInterface, HoodieSpark3_
import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder,
StructType}
import org.apache.spark.sql.vectorized.ColumnarBatchRow
import org.apache.spark.sql._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel._
@@ -124,4 +126,20 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
case OFF_HEAP => "OFF_HEAP"
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel:
$level")
}
+
+ /**
+ * Get parquet file reader
+ *
+ * @param vectorized true if vectorized reading is not prohibited due to
schema, reading mode, etc
+ * @param sqlConf the [[SQLConf]] used for the read
+ * @param options passed as a param to the file format
+ * @param hadoopConf some configs will be set for the hadoopConf
+ * @return parquet file reader
+ */
+ override def createParquetFileReader(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration):
SparkParquetReader = {
+ Spark34ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
new file mode 100644
index 00000000000..73db889a044
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.FileSplit
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop._
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+
+class Spark34ParquetReader(enableVectorizedReader: Boolean,
+ datetimeRebaseModeInRead: String,
+ int96RebaseModeInRead: String,
+ enableParquetFilterPushDown: Boolean,
+ pushDownDate: Boolean,
+ pushDownTimestamp: Boolean,
+ pushDownDecimal: Boolean,
+ pushDownInFilterThreshold: Int,
+ pushDownStringPredicate: Boolean,
+ isCaseSensitive: Boolean,
+ timestampConversion: Boolean,
+ enableOffHeapColumnVector: Boolean,
+ capacity: Int,
+ returningBatch: Boolean,
+ enableRecordFilter: Boolean,
+ timeZoneId: Option[String]) extends
SparkParquetReaderBase(
+ enableVectorizedReader = enableVectorizedReader,
+ enableParquetFilterPushDown = enableParquetFilterPushDown,
+ pushDownDate = pushDownDate,
+ pushDownTimestamp = pushDownTimestamp,
+ pushDownDecimal = pushDownDecimal,
+ pushDownInFilterThreshold = pushDownInFilterThreshold,
+ isCaseSensitive = isCaseSensitive,
+ timestampConversion = timestampConversion,
+ enableOffHeapColumnVector = enableOffHeapColumnVector,
+ capacity = capacity,
+ returningBatch = returningBatch,
+ enableRecordFilter = enableRecordFilter,
+ timeZoneId = timeZoneId) {
+
+ /**
+ * Read an individual parquet file
+ * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark
v3.4.2 adapted here
+ *
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param sharedConf the hadoop conf
+ * @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
+ */
+ protected def doRead(file: PartitionedFile,
+ requiredSchema: StructType,
+ partitionSchema: StructType,
+ filters: Seq[Filter],
+ sharedConf: Configuration): Iterator[InternalRow] = {
+ assert(file.partitionValues.numFields == partitionSchema.size)
+
+ val filePath = file.toPath
+ val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
+
+
+ lazy val footerFileMetaData =
+ ParquetFooterReader.readFooter(sharedConf, filePath,
SKIP_ROW_GROUPS).getFileMetaData
+ val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
+ footerFileMetaData.getKeyValueMetaData.get,
+ datetimeRebaseModeInRead)
+ // Try to push down filters when filter push-down is enabled.
+ val pushed = if (enableParquetFilterPushDown) {
+ val parquetSchema = footerFileMetaData.getSchema
+ val parquetFilters = new ParquetFilters(
+ parquetSchema,
+ pushDownDate,
+ pushDownTimestamp,
+ pushDownDecimal,
+ pushDownStringPredicate,
+ pushDownInFilterThreshold,
+ isCaseSensitive,
+ datetimeRebaseSpec)
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not
all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
+ // is used here.
+ .flatMap(parquetFilters.createFilter(_))
+ .reduceOption(FilterApi.and)
+ } else {
+ None
+ }
+
+ // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions
to int96 timestamps'
+ // *only* if the file was created by something other than "parquet-mr", so
check the actual
+ // writer here for this file. We have to do this per-file, as each file
in the table may
+ // have different writers.
+ // Define isCreatedByParquetMr as function to avoid unnecessary parquet
footer reads.
+ def isCreatedByParquetMr: Boolean =
+ footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+ val convertTz =
+ if (timestampConversion && !isCreatedByParquetMr) {
+
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+ } else {
+ None
+ }
+
+ val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
+ footerFileMetaData.getKeyValueMetaData.get,
+ int96RebaseModeInRead)
+
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
+ val hadoopAttemptContext =
+ new TaskAttemptContextImpl(sharedConf, attemptId)
+
+ // Try to push down filters when filter push-down is enabled.
+ // Notice: This push-down is RowGroups level, not individual records.
+ if (pushed.isDefined) {
+
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration,
pushed.get)
+ }
+ val taskContext = Option(TaskContext.get())
+ if (enableVectorizedReader) {
+ val vectorizedReader = new VectorizedParquetRecordReader(
+ convertTz.orNull,
+ datetimeRebaseSpec.mode.toString,
+ datetimeRebaseSpec.timeZone,
+ int96RebaseSpec.mode.toString,
+ int96RebaseSpec.timeZone,
+ enableOffHeapColumnVector && taskContext.isDefined,
+ capacity)
+ // SPARK-37089: We cannot register a task completion listener to close
this iterator here
+ // because downstream exec nodes have already registered their
listeners. Since listeners
+ // are executed in reverse order of registration, a listener registered
here would close the
+ // iterator while downstream exec nodes are still running. When off-heap
column vectors are
+ // enabled, this can cause a use-after-free bug leading to a segfault.
+ //
+ // Instead, we use FileScanRDD's task completion listener to close this
iterator.
+ val iter = new RecordReaderIterator(vectorizedReader)
+ try {
+ vectorizedReader.initialize(split, hadoopAttemptContext)
+ vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+ if (returningBatch) {
+ vectorizedReader.enableReturningBatches()
+ }
+
+ // UnsafeRowParquetRecordReader appends the columns internally to
avoid another copy.
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } catch {
+ case e: Throwable =>
+ // SPARK-23457: In case there is an exception in initialization,
close the iterator to
+ // avoid leaking resources.
+ iter.close()
+ throw e
+ }
+ } else {
+ // ParquetRecordReader returns InternalRow
+ val readSupport = new ParquetReadSupport(
+ convertTz,
+ enableVectorizedReader = false,
+ datetimeRebaseSpec,
+ int96RebaseSpec)
+ val reader = if (pushed.isDefined && enableRecordFilter) {
+ val parquetFilter = FilterCompat.get(pushed.get, null)
+ new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
+ } else {
+ new ParquetRecordReader[InternalRow](readSupport)
+ }
+ val readerWithRowIndexes =
ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader,
+ requiredSchema)
+ val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes)
+ try {
+ readerWithRowIndexes.initialize(split, hadoopAttemptContext)
+
+ val fullSchema = requiredSchema.toAttributes ++
partitionSchema.toAttributes
+ val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema,
fullSchema)
+
+ if (partitionSchema.length == 0) {
+ // There is no partition columns
+ iter.map(unsafeProjection)
+ } else {
+ val joinedRow = new JoinedRow()
+ iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+ }
+ } catch {
+ case e: Throwable =>
+ // SPARK-23457: In case there is an exception in initialization,
close the iterator to
+ // avoid leaking resources.
+ iter.close()
+ throw e
+ }
+ }
+ }
+}
+
+object Spark34ParquetReader extends SparkParquetReaderBuilder {
+ /**
+ * Get parquet file reader
+ *
+ * @param vectorized true if vectorized reading is not prohibited due to
schema, reading mode, etc
+ * @param sqlConf the [[SQLConf]] used for the read
+ * @param options passed as a param to the file format
+ * @param hadoopConf some configs will be set for the hadoopConf
+ * @return parquet file reader
+ */
+ def build(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration): SparkParquetReader = {
+ //set hadoopconf
+ hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
+ hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key,
sqlConf.sessionLocalTimeZone)
+ hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
sqlConf.nestedSchemaPruningEnabled)
+ hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key,
sqlConf.caseSensitiveAnalysis)
+ hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key,
sqlConf.isParquetBinaryAsString)
+ hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sqlConf.isParquetINT96AsTimestamp)
+ // Using string value of this conf to preserve compatibility across spark
versions. See [HUDI-5868]
+ hadoopConf.setBoolean(
+ SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+ sqlConf.getConfString(
+ SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+ SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValueString).toBoolean
+ )
+ hadoopConf.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
sqlConf.parquetInferTimestampNTZEnabled)
+
+ val returningBatch = sqlConf.parquetVectorizedReaderEnabled &&
+ options.getOrElse(FileFormat.OPTION_RETURNING_BATCH,
+ throw new IllegalArgumentException(
+ "OPTION_RETURNING_BATCH should always be set for ParquetFileFormat.
" +
+ "To workaround this issue, set
spark.sql.parquet.enableVectorizedReader=false."))
+ .equals("true")
+
+ val parquetOptions = new ParquetOptions(options, sqlConf)
+ new Spark34ParquetReader(
+ enableVectorizedReader = vectorized,
+ datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead,
+ int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead,
+ enableParquetFilterPushDown = sqlConf.parquetFilterPushDown,
+ pushDownDate = sqlConf.parquetFilterPushDownDate,
+ pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp,
+ pushDownDecimal = sqlConf.parquetFilterPushDownDecimal,
+ pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold,
+ pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate,
+ isCaseSensitive = sqlConf.caseSensitiveAnalysis,
+ timestampConversion = sqlConf.isParquetINT96TimestampConversion,
+ enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled,
+ capacity = sqlConf.parquetVectorizedReaderBatchSize,
+ returningBatch = returningBatch,
+ enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
+ timeZoneId = Some(sqlConf.sessionLocalTimeZone))
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
index d18291a1809..a3e0c19621b 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.adapter
import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
import org.apache.hudi.Spark35HoodieFileScanRDD
import org.apache.spark.sql._
import org.apache.spark.sql.avro._
@@ -31,9 +32,10 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark35LegacyHoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark35ParquetReader, Spark35LegacyHoodieParquetFileFormat, SparkParquetReader}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.hudi.analysis.TableValuedFunctions
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.parser.{HoodieExtendedParserInterface,
HoodieSpark3_5ExtendedSqlParser}
import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder,
StructType}
import org.apache.spark.sql.vectorized.ColumnarBatchRow
@@ -124,4 +126,20 @@ class Spark3_5Adapter extends BaseSpark3Adapter {
case OFF_HEAP => "OFF_HEAP"
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel:
$level")
}
+
+ /**
+ * Get parquet file reader
+ *
+ * @param vectorized true if vectorized reading is not prohibited due to
schema, reading mode, etc
+ * @param sqlConf the [[SQLConf]] used for the read
+ * @param options passed as a param to the file format
+ * @param hadoopConf some configs will be set for the hadoopConf
+ * @return parquet file reader
+ */
+ override def createParquetFileReader(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration):
SparkParquetReader = {
+ Spark35ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
new file mode 100644
index 00000000000..f088efd07e1
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.{DataSourceUtils,
FileFormat, PartitionedFile, RecordReaderIterator}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.StructType
+
+class Spark35ParquetReader(enableVectorizedReader: Boolean,
+ datetimeRebaseModeInRead: String,
+ int96RebaseModeInRead: String,
+ enableParquetFilterPushDown: Boolean,
+ pushDownDate: Boolean,
+ pushDownTimestamp: Boolean,
+ pushDownDecimal: Boolean,
+ pushDownInFilterThreshold: Int,
+ pushDownStringPredicate: Boolean,
+ isCaseSensitive: Boolean,
+ timestampConversion: Boolean,
+ enableOffHeapColumnVector: Boolean,
+ capacity: Int,
+ returningBatch: Boolean,
+ enableRecordFilter: Boolean,
+ timeZoneId: Option[String]) extends
SparkParquetReaderBase(
+ enableVectorizedReader = enableVectorizedReader,
+ enableParquetFilterPushDown = enableParquetFilterPushDown,
+ pushDownDate = pushDownDate,
+ pushDownTimestamp = pushDownTimestamp,
+ pushDownDecimal = pushDownDecimal,
+ pushDownInFilterThreshold = pushDownInFilterThreshold,
+ isCaseSensitive = isCaseSensitive,
+ timestampConversion = timestampConversion,
+ enableOffHeapColumnVector = enableOffHeapColumnVector,
+ capacity = capacity,
+ returningBatch = returningBatch,
+ enableRecordFilter = enableRecordFilter,
+ timeZoneId = timeZoneId) {
+
+ /**
+ * Read an individual parquet file
+ * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark
v3.5.1 adapted here
+ *
+ * @param file parquet file to read
+ * @param requiredSchema desired output schema of the data
+ * @param partitionSchema schema of the partition columns. Partition values
will be appended to the end of every row
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param sharedConf the hadoop conf
+ * @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
+ */
+ protected def doRead(file: PartitionedFile,
+ requiredSchema: StructType,
+ partitionSchema: StructType,
+ filters: Seq[Filter],
+ sharedConf: Configuration): Iterator[InternalRow] = {
+ assert(file.partitionValues.numFields == partitionSchema.size)
+
+ val filePath = file.toPath
+ val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
+
+ val fileFooter = if (enableVectorizedReader) {
+ // When there are vectorized reads, we can avoid reading the footer
twice by reading
+ // all row groups in advance and filter row groups according to filters
that require
+ // push down (no need to read the footer metadata again).
+ ParquetFooterReader.readFooter(sharedConf, file,
ParquetFooterReader.WITH_ROW_GROUPS)
+ } else {
+ ParquetFooterReader.readFooter(sharedConf, file,
ParquetFooterReader.SKIP_ROW_GROUPS)
+ }
+
+ val footerFileMetaData = fileFooter.getFileMetaData
+ val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
+ footerFileMetaData.getKeyValueMetaData.get,
+ datetimeRebaseModeInRead)
+ val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
+ footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+
+ // Try to push down filters when filter push-down is enabled.
+ val pushed = if (enableParquetFilterPushDown) {
+ val parquetSchema = footerFileMetaData.getSchema
+ val parquetFilters = new ParquetFilters(
+ parquetSchema,
+ pushDownDate,
+ pushDownTimestamp,
+ pushDownDecimal,
+ pushDownStringPredicate,
+ pushDownInFilterThreshold,
+ isCaseSensitive,
+ datetimeRebaseSpec)
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not
all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`).
That's why a `flatMap`
+ // is used here.
+ .flatMap(parquetFilters.createFilter(_))
+ .reduceOption(FilterApi.and)
+ } else {
+ None
+ }
+
+ // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions
to int96 timestamps'
+ // *only* if the file was created by something other than "parquet-mr", so
check the actual
+ // writer here for this file. We have to do this per-file, as each file
in the table may
+ // have different writers.
+ // Define isCreatedByParquetMr as function to avoid unnecessary parquet
footer reads.
+ def isCreatedByParquetMr: Boolean =
+ footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+ val convertTz =
+ if (timestampConversion && !isCreatedByParquetMr) {
+
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+ } else {
+ None
+ }
+
+
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
+ val hadoopAttemptContext =
+ new TaskAttemptContextImpl(sharedConf, attemptId)
+
+ // Try to push down filters when filter push-down is enabled.
+ // Notice: This push-down is RowGroups level, not individual records.
+ if (pushed.isDefined) {
+
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration,
pushed.get)
+ }
+ val taskContext = Option(TaskContext.get())
+ if (enableVectorizedReader) {
+ val vectorizedReader = new VectorizedParquetRecordReader(
+ convertTz.orNull,
+ datetimeRebaseSpec.mode.toString,
+ datetimeRebaseSpec.timeZone,
+ int96RebaseSpec.mode.toString,
+ int96RebaseSpec.timeZone,
+ enableOffHeapColumnVector && taskContext.isDefined,
+ capacity)
+ // SPARK-37089: We cannot register a task completion listener to close
this iterator here
+ // because downstream exec nodes have already registered their
listeners. Since listeners
+ // are executed in reverse order of registration, a listener registered
here would close the
+ // iterator while downstream exec nodes are still running. When off-heap
column vectors are
+ // enabled, this can cause a use-after-free bug leading to a segfault.
+ //
+ // Instead, we use FileScanRDD's task completion listener to close this
iterator.
+ val iter = new RecordReaderIterator(vectorizedReader)
+ try {
+ vectorizedReader.initialize(split, hadoopAttemptContext,
Option.apply(fileFooter))
+ vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+ if (returningBatch) {
+ vectorizedReader.enableReturningBatches()
+ }
+
+ // UnsafeRowParquetRecordReader appends the columns internally to
avoid another copy.
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } catch {
+ case e: Throwable =>
+ // SPARK-23457: In case there is an exception in initialization,
close the iterator to
+ // avoid leaking resources.
+ iter.close()
+ throw e
+ }
+ } else {
+ // ParquetRecordReader returns InternalRow
+ val readSupport = new ParquetReadSupport(
+ convertTz,
+ enableVectorizedReader = false,
+ datetimeRebaseSpec,
+ int96RebaseSpec)
+ val reader = if (pushed.isDefined && enableRecordFilter) {
+ val parquetFilter = FilterCompat.get(pushed.get, null)
+ new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
+ } else {
+ new ParquetRecordReader[InternalRow](readSupport)
+ }
+ val readerWithRowIndexes =
ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader,
+ requiredSchema)
+ val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes)
+ try {
+ readerWithRowIndexes.initialize(split, hadoopAttemptContext)
+
+ val fullSchema = toAttributes(requiredSchema) ++
toAttributes(partitionSchema)
+ val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema,
fullSchema)
+
+ if (partitionSchema.length == 0) {
+ // There is no partition columns
+ iter.map(unsafeProjection)
+ } else {
+ val joinedRow = new JoinedRow()
+ iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+ }
+ } catch {
+ case e: Throwable =>
+ // SPARK-23457: In case there is an exception in initialization,
close the iterator to
+ // avoid leaking resources.
+ iter.close()
+ throw e
+ }
+ }
+ }
+}
+
+object Spark35ParquetReader extends SparkParquetReaderBuilder {
+ /**
+ * Get parquet file reader
+ *
+ * @param vectorized true if vectorized reading is not prohibited due to
schema, reading mode, etc
+ * @param sqlConf the [[SQLConf]] used for the read
+ * @param options passed as a param to the file format
+ * @param hadoopConf some configs will be set for the hadoopConf
+ * @return parquet file reader
+ */
+ def build(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration): SparkParquetReader = {
+ //set hadoopconf
+ hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
+ hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key,
sqlConf.sessionLocalTimeZone)
+ hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
sqlConf.nestedSchemaPruningEnabled)
+ hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key,
sqlConf.caseSensitiveAnalysis)
+ hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key,
sqlConf.isParquetBinaryAsString)
+ hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sqlConf.isParquetINT96AsTimestamp)
+ // Using string value of this conf to preserve compatibility across spark
versions. See [HUDI-5868]
+ hadoopConf.setBoolean(
+ SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+ sqlConf.getConfString(
+ SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+ SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValueString).toBoolean
+ )
+ hadoopConf.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
sqlConf.parquetInferTimestampNTZEnabled)
+
+ val returningBatch = sqlConf.parquetVectorizedReaderEnabled &&
+ options.getOrElse(FileFormat.OPTION_RETURNING_BATCH,
+ throw new IllegalArgumentException(
+ "OPTION_RETURNING_BATCH should always be set for ParquetFileFormat.
" +
+ "To workaround this issue, set
spark.sql.parquet.enableVectorizedReader=false."))
+ .equals("true")
+
+ val parquetOptions = new ParquetOptions(options, sqlConf)
+ new Spark35ParquetReader(
+ enableVectorizedReader = vectorized,
+ datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead,
+ int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead,
+ enableParquetFilterPushDown = sqlConf.parquetFilterPushDown,
+ pushDownDate = sqlConf.parquetFilterPushDownDate,
+ pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp,
+ pushDownDecimal = sqlConf.parquetFilterPushDownDecimal,
+ pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold,
+ pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate,
+ isCaseSensitive = sqlConf.caseSensitiveAnalysis,
+ timestampConversion = sqlConf.isParquetINT96TimestampConversion,
+ enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled,
+ capacity = sqlConf.parquetVectorizedReaderBatchSize,
+ returningBatch = returningBatch,
+ enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
+ timeZoneId = Some(sqlConf.sessionLocalTimeZone))
+ }
+}