This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d6ff3d6ba46 [HUDI-5989] Fix date conversion issue when performing
partition pruning on Spark (#8298)
d6ff3d6ba46 is described below
commit d6ff3d6ba46b51f58ebb1db58f26423a211741f5
Author: voonhous <[email protected]>
AuthorDate: Mon Apr 10 10:40:16 2023 +0800
[HUDI-5989] Fix date conversion issue when performing partition pruning on
Spark (#8298)
When lazy fetching partition path & file slice for HoodieFileIndex is used,
date cannot be converted to the correct string representation.
This is the case as Spark store dates as an integer value representing the
number of days that has past since 1970-01-01.
When rebuilding the partition path, this FS path could be rebuilt wrongly
causing a partition to be empty, and hence, the query result to be
empty/incorrect.
Co-authored-by: Rex An <[email protected]>
---
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 8 +-
.../apache/hudi/SparkHoodieTableFileIndex.scala | 42 ++++++----
.../sql/hudi/TestLazyPartitionPathFetching.scala | 97 ++++++++++++++++++++++
.../apache/spark/sql/adapter/Spark2Adapter.scala | 12 +++
.../spark/sql/adapter/BaseSpark3Adapter.scala | 13 +++
5 files changed, 156 insertions(+), 16 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 4775af504bc..c38906bf428 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Command,
LogicalPlan}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan,
SubqueryAlias}
+import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.parser.HoodieExtendedParserInterface
@@ -42,7 +43,7 @@ import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
import org.apache.spark.storage.StorageLevel
-import java.util.Locale
+import java.util.{Locale, TimeZone}
/**
* Interface adapting discrepancies and incompatibilities between different
Spark versions
@@ -115,6 +116,11 @@ trait SparkAdapter extends Serializable {
*/
def getSparkParsePartitionUtil: SparkParsePartitionUtil
+ /**
+ * Get the [[DateFormatter]].
+ */
+ def getDateFormatter(tz: TimeZone): DateFormatter
+
/**
* Combine [[PartitionedFile]] to [[FilePartition]] according to
`maxSplitBytes`.
*/
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 677d5d27076..a9a20057795 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -35,12 +35,12 @@ import org.apache.hudi.util.JFunction
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
BoundReference, EmptyRow, EqualTo, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
BoundReference, EmptyRow, EqualTo, Expression, InterpretedPredicate, Literal}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types.{ByteType, DataType, DateType, IntegerType,
LongType, ShortType, StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
import javax.annotation.concurrent.NotThreadSafe
@@ -281,14 +281,15 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
// Static partition-path prefix is defined as a prefix of the full
partition-path where only
// first N partition columns (in-order) have proper (static) values bound
in equality predicates,
// allowing in turn to build such prefix to be used in subsequent filtering
- val staticPartitionColumnNameValuePairs: Seq[(String, Any)] = {
+ val staticPartitionColumnNameValuePairs: Seq[(String, (String, Any))] = {
// Extract from simple predicates of the form `date = '2022-01-01'` both
// partition column and corresponding (literal) value
- val staticPartitionColumnValuesMap =
extractEqualityPredicatesLiteralValues(partitionColumnPredicates)
+ val zoneId = configProperties.getString(DateTimeUtils.TIMEZONE_OPTION,
SQLConf.get.sessionLocalTimeZone)
+ val staticPartitionColumnValuesMap =
extractEqualityPredicatesLiteralValues(partitionColumnPredicates, zoneId)
// NOTE: For our purposes we can only construct partition-path prefix if
proper prefix of the
// partition-schema has been bound by the partition-predicates
partitionColumnNames.takeWhile(colName =>
staticPartitionColumnValuesMap.contains(colName))
- .map(colName => (colName, staticPartitionColumnValuesMap(colName).get))
+ .map(colName => (colName, (staticPartitionColumnValuesMap(colName)._1,
staticPartitionColumnValuesMap(colName)._2.get)))
}
if (staticPartitionColumnNameValuePairs.isEmpty) {
@@ -301,7 +302,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
if (staticPartitionColumnNameValuePairs.length ==
partitionColumnNames.length) {
// In case composed partition path is complete, we can return it
directly avoiding extra listing operation
- Seq(new PartitionPath(relativePartitionPathPrefix,
staticPartitionColumnNameValuePairs.map(_._2.asInstanceOf[AnyRef]).toArray))
+ Seq(new PartitionPath(relativePartitionPathPrefix,
staticPartitionColumnNameValuePairs.map(_._2._2.asInstanceOf[AnyRef]).toArray))
} else {
// Otherwise, compile extracted partition values (from query
predicates) into a sub-path which is a prefix
// of the complete partition path, do listing for this prefix-path only
@@ -315,7 +316,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
*
* @return relative partition path and a flag to indicate if the path is
complete (i.e., not a prefix)
*/
- private def
composeRelativePartitionPath(staticPartitionColumnNameValuePairs: Seq[(String,
Any)]): String = {
+ private def
composeRelativePartitionPath(staticPartitionColumnNameValuePairs: Seq[(String,
(String, Any))]): String = {
checkState(staticPartitionColumnNameValuePairs.nonEmpty)
// Since static partition values might not be available for all columns,
we compile
@@ -331,7 +332,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
)
partitionPathFormatter.combine(staticPartitionColumnNames.asJava,
- staticPartitionColumnValues.map(_.asInstanceOf[AnyRef]): _*)
+ staticPartitionColumnValues.map(_._1): _*)
}
protected def doParsePartitionColumnValues(partitionColumns: Array[String],
partitionPath: String): Array[Object] = {
@@ -407,24 +408,35 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
metaClient.getTableConfig.getUrlEncodePartitioning.toBoolean
}
-object SparkHoodieTableFileIndex {
+object SparkHoodieTableFileIndex extends SparkAdapterSupport {
private def haveProperPartitionValues(partitionPaths: Seq[PartitionPath]) = {
partitionPaths.forall(_.values.length > 0)
}
- private def extractEqualityPredicatesLiteralValues(predicates:
Seq[Expression]): Map[String, Option[Any]] = {
+ private def extractEqualityPredicatesLiteralValues(predicates:
Seq[Expression], zoneId: String): Map[String, (String, Option[Any])] = {
// TODO support coercible expressions (ie attr-references casted to
particular type), similar
// to `MERGE INTO` statement
+
+ object ExtractableLiteral {
+ def unapply(exp: Expression): Option[String] = exp match {
+ case Literal(null, _) => None // `null`s can be cast as other types;
we want to avoid NPEs.
+ case Literal(value, _: ByteType | IntegerType | LongType | ShortType)
=> Some(value.toString)
+ case Literal(value, _: StringType) => Some(value.toString)
+ case Literal(value, _: DateType) =>
+
Some(sparkAdapter.getDateFormatter(DateTimeUtils.getTimeZone(zoneId)).format(value.asInstanceOf[Int]))
+ case _ => None
+ }
+ }
+
// NOTE: To properly support predicates of the form `x = NULL`, we have to
wrap result
// of the folded expression into [[Some]] (to distinguish it from
the case when partition-column
// isn't bound to any value by the predicate)
predicates.flatMap {
- case EqualTo(attr: AttributeReference, e: Expression) if e.foldable =>
- Seq((attr.name, Some(e.eval(EmptyRow))))
- case EqualTo(e: Expression, attr: AttributeReference) if e.foldable =>
- Seq((attr.name, Some(e.eval(EmptyRow))))
-
+ case EqualTo(attr: AttributeReference, e @
ExtractableLiteral(valueString)) =>
+ Seq((attr.name, (valueString, Some(e.eval(EmptyRow)))))
+ case EqualTo(e @ ExtractableLiteral(valueString), attr:
AttributeReference) =>
+ Seq((attr.name, (valueString, Some(e.eval(EmptyRow)))))
case _ => Seq.empty
}.toMap
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala
new file mode 100644
index 00000000000..0467b6664c6
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+class TestLazyPartitionPathFetching extends HoodieSparkSqlTestBase {
+
+ test("Test querying with string column + partition pruning") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | date_par date
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | type = 'cow',
+ | preCombineField = 'ts'
+ | )
+ | PARTITIONED BY (date_par)
+ """.stripMargin)
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, date
'2023-02-27')")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, date
'2023-02-28')")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, date
'2023-03-01')")
+
+ checkAnswer(s"select id, name, price, ts from $tableName where
date_par='2023-03-01' order by id")(
+ Seq(3, "a3", 10.0, 1000)
+ )
+
+ withSQLConf("spark.sql.session.timeZone" -> "UTC+2") {
+ checkAnswer(s"select id, name, price, ts from $tableName where
date_par='2023-03-01' order by id")(
+ Seq(3, "a3", 10.0, 1000)
+ )
+ }
+ }
+ }
+
+ test("Test querying with date column + partition pruning (multi-level
partitioning)") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | country string,
+ | date_par date
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | type = 'cow',
+ | preCombineField = 'ts'
+ | )
+ | PARTITIONED BY (country, date_par)
+ """.stripMargin)
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 'ID', date
'2023-02-27')")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, 'ID', date
'2023-02-28')")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, 'ID', date
'2023-03-01')")
+
+ // for lazy fetching partition path & file slice to be enabled, filter
must be applied on all partitions
+ checkAnswer(s"select id, name, price, ts from $tableName " +
+ s"where date_par='2023-03-01' and country='ID' order by id")(
+ Seq(3, "a3", 10.0, 1000)
+ )
+
+ withSQLConf("spark.sql.session.timeZone" -> "UTC+2") {
+ checkAnswer(s"select id, name, price, ts from $tableName " +
+ s"where date_par='2023-03-01' and country='ID' order by id")(
+ Seq(3, "a3", 10.0, 1000)
+ )
+ }
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index dd72282fec6..545419b148a 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable,
LogicalPlan}
+import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark24HoodieParquetFileFormat}
import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
@@ -45,7 +46,11 @@ import org.apache.spark.sql.types.{DataType, Metadata,
MetadataBuilder, StructTy
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel._
+import java.time.ZoneId
+import java.util.TimeZone
+import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters.mapAsScalaMapConverter
+import scala.collection.convert.Wrappers.JConcurrentMapWrapper
import scala.collection.mutable.ArrayBuffer
/**
@@ -64,6 +69,9 @@ class Spark2Adapter extends SparkAdapter {
// we simply produce an empty [[Metadata]] instance
new MetadataBuilder().build()
+ private val cache = JConcurrentMapWrapper(
+ new ConcurrentHashMap[ZoneId, DateFormatter](1))
+
override def getCatalogUtils: HoodieCatalogUtils = {
throw new UnsupportedOperationException("Catalog utilities are not
supported in Spark 2.x");
}
@@ -90,6 +98,10 @@ class Spark2Adapter extends SparkAdapter {
override def getSparkParsePartitionUtil: SparkParsePartitionUtil =
Spark2ParsePartitionUtil
+ override def getDateFormatter(tz: TimeZone): DateFormatter = {
+ cache.getOrElseUpdate(tz.toZoneId, DateFormatter())
+ }
+
/**
* Combine [[PartitionedFile]] to [[FilePartition]] according to
`maxSplitBytes`.
*
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index 022a1d247e1..3880bda454d 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -21,6 +21,7 @@ import org.apache.avro.Schema
import org.apache.hadoop.fs.Path
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.spark3.internal.ReflectUtil
import org.apache.hudi.{AvroConversionUtils, DefaultSource, Spark3RowSerDe}
import org.apache.hudi.{AvroConversionUtils, DefaultSource,
HoodieBaseRelation, Spark3RowSerDe}
import org.apache.spark.internal.Logging
@@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression,
InterpretedPredicate, Predicate}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -42,13 +44,20 @@ import org.apache.spark.sql.{HoodieSpark3CatalogUtils,
SQLContext, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel._
+import java.time.ZoneId
+import java.util.TimeZone
+import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters.mapAsScalaMapConverter
+import scala.collection.convert.Wrappers.JConcurrentMapWrapper
/**
* Base implementation of [[SparkAdapter]] for Spark 3.x branch
*/
abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
+ private val cache = JConcurrentMapWrapper(
+ new ConcurrentHashMap[ZoneId, DateFormatter](1))
+
def getCatalogUtils: HoodieSpark3CatalogUtils
override def createSparkRowSerDe(schema: StructType): SparkRowSerDe = {
@@ -74,6 +83,10 @@ abstract class BaseSpark3Adapter extends SparkAdapter with
Logging {
override def getSparkParsePartitionUtil: SparkParsePartitionUtil =
Spark3ParsePartitionUtil
+ override def getDateFormatter(tz: TimeZone): DateFormatter = {
+ cache.getOrElseUpdate(tz.toZoneId,
ReflectUtil.getDateFormatter(tz.toZoneId))
+ }
+
/**
* Combine [[PartitionedFile]] to [[FilePartition]] according to
`maxSplitBytes`.
*/