This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d6ff3d6ba46 [HUDI-5989] Fix date conversion issue when performing 
partition pruning on Spark (#8298)
d6ff3d6ba46 is described below

commit d6ff3d6ba46b51f58ebb1db58f26423a211741f5
Author: voonhous <[email protected]>
AuthorDate: Mon Apr 10 10:40:16 2023 +0800

    [HUDI-5989] Fix date conversion issue when performing partition pruning on 
Spark (#8298)
    
    When lazy fetching partition path & file slice for HoodieFileIndex is used, 
date cannot be converted to the correct string representation.
    
    This is the case as Spark store dates as an integer value representing the 
number of days that has past since 1970-01-01.
    
    When rebuilding the partition path, this FS path could be rebuilt wrongly 
causing a partition to be empty, and hence, the query result to be 
empty/incorrect.
    
    Co-authored-by: Rex An <[email protected]>
---
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |  8 +-
 .../apache/hudi/SparkHoodieTableFileIndex.scala    | 42 ++++++----
 .../sql/hudi/TestLazyPartitionPathFetching.scala   | 97 ++++++++++++++++++++++
 .../apache/spark/sql/adapter/Spark2Adapter.scala   | 12 +++
 .../spark/sql/adapter/BaseSpark3Adapter.scala      | 13 +++
 5 files changed, 156 insertions(+), 16 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 4775af504bc..c38906bf428 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Command, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, 
SubqueryAlias}
+import org.apache.spark.sql.catalyst.util.DateFormatter
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.parser.HoodieExtendedParserInterface
@@ -42,7 +43,7 @@ import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.{DataType, Metadata, StructType}
 import org.apache.spark.storage.StorageLevel
 
-import java.util.Locale
+import java.util.{Locale, TimeZone}
 
 /**
  * Interface adapting discrepancies and incompatibilities between different 
Spark versions
@@ -115,6 +116,11 @@ trait SparkAdapter extends Serializable {
    */
   def getSparkParsePartitionUtil: SparkParsePartitionUtil
 
+  /**
+   * Get the [[DateFormatter]].
+   */
+  def getDateFormatter(tz: TimeZone): DateFormatter
+
   /**
    * Combine [[PartitionedFile]] to [[FilePartition]] according to 
`maxSplitBytes`.
    */
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 677d5d27076..a9a20057795 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -35,12 +35,12 @@ import org.apache.hudi.util.JFunction
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, EmptyRow, EqualTo, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, EmptyRow, EqualTo, Expression, InterpretedPredicate, Literal}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.{InternalRow, expressions}
 import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types.{ByteType, DataType, DateType, IntegerType, 
LongType, ShortType, StringType, StructField, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 import javax.annotation.concurrent.NotThreadSafe
@@ -281,14 +281,15 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
     // Static partition-path prefix is defined as a prefix of the full 
partition-path where only
     // first N partition columns (in-order) have proper (static) values bound 
in equality predicates,
     // allowing in turn to build such prefix to be used in subsequent filtering
-    val staticPartitionColumnNameValuePairs: Seq[(String, Any)] = {
+    val staticPartitionColumnNameValuePairs: Seq[(String, (String, Any))] = {
       // Extract from simple predicates of the form `date = '2022-01-01'` both
       // partition column and corresponding (literal) value
-      val staticPartitionColumnValuesMap = 
extractEqualityPredicatesLiteralValues(partitionColumnPredicates)
+      val zoneId = configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, 
SQLConf.get.sessionLocalTimeZone)
+      val staticPartitionColumnValuesMap = 
extractEqualityPredicatesLiteralValues(partitionColumnPredicates, zoneId)
       // NOTE: For our purposes we can only construct partition-path prefix if 
proper prefix of the
       //       partition-schema has been bound by the partition-predicates
       partitionColumnNames.takeWhile(colName => 
staticPartitionColumnValuesMap.contains(colName))
-        .map(colName => (colName, staticPartitionColumnValuesMap(colName).get))
+        .map(colName => (colName, (staticPartitionColumnValuesMap(colName)._1, 
staticPartitionColumnValuesMap(colName)._2.get)))
     }
 
     if (staticPartitionColumnNameValuePairs.isEmpty) {
@@ -301,7 +302,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
 
       if (staticPartitionColumnNameValuePairs.length == 
partitionColumnNames.length) {
         // In case composed partition path is complete, we can return it 
directly avoiding extra listing operation
-        Seq(new PartitionPath(relativePartitionPathPrefix, 
staticPartitionColumnNameValuePairs.map(_._2.asInstanceOf[AnyRef]).toArray))
+        Seq(new PartitionPath(relativePartitionPathPrefix, 
staticPartitionColumnNameValuePairs.map(_._2._2.asInstanceOf[AnyRef]).toArray))
       } else {
         // Otherwise, compile extracted partition values (from query 
predicates) into a sub-path which is a prefix
         // of the complete partition path, do listing for this prefix-path only
@@ -315,7 +316,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
    *
    * @return relative partition path and a flag to indicate if the path is 
complete (i.e., not a prefix)
    */
-  private def 
composeRelativePartitionPath(staticPartitionColumnNameValuePairs: Seq[(String, 
Any)]): String = {
+  private def 
composeRelativePartitionPath(staticPartitionColumnNameValuePairs: Seq[(String, 
(String, Any))]): String = {
     checkState(staticPartitionColumnNameValuePairs.nonEmpty)
 
     // Since static partition values might not be available for all columns, 
we compile
@@ -331,7 +332,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
     )
 
     partitionPathFormatter.combine(staticPartitionColumnNames.asJava,
-      staticPartitionColumnValues.map(_.asInstanceOf[AnyRef]): _*)
+      staticPartitionColumnValues.map(_._1): _*)
   }
 
   protected def doParsePartitionColumnValues(partitionColumns: Array[String], 
partitionPath: String): Array[Object] = {
@@ -407,24 +408,35 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
     metaClient.getTableConfig.getUrlEncodePartitioning.toBoolean
 }
 
-object SparkHoodieTableFileIndex {
+object SparkHoodieTableFileIndex extends SparkAdapterSupport {
 
   private def haveProperPartitionValues(partitionPaths: Seq[PartitionPath]) = {
     partitionPaths.forall(_.values.length > 0)
   }
 
-  private def extractEqualityPredicatesLiteralValues(predicates: 
Seq[Expression]): Map[String, Option[Any]] = {
+  private def extractEqualityPredicatesLiteralValues(predicates: 
Seq[Expression], zoneId: String): Map[String, (String, Option[Any])] = {
     // TODO support coercible expressions (ie attr-references casted to 
particular type), similar
     //      to `MERGE INTO` statement
+
+    object ExtractableLiteral {
+      def unapply(exp: Expression): Option[String] = exp match {
+        case Literal(null, _) => None // `null`s can be cast as other types; 
we want to avoid NPEs.
+        case Literal(value, _: ByteType | IntegerType | LongType | ShortType) 
=> Some(value.toString)
+        case Literal(value, _: StringType) => Some(value.toString)
+        case Literal(value, _: DateType) =>
+          
Some(sparkAdapter.getDateFormatter(DateTimeUtils.getTimeZone(zoneId)).format(value.asInstanceOf[Int]))
+        case _ => None
+      }
+    }
+
     // NOTE: To properly support predicates of the form `x = NULL`, we have to 
wrap result
     //       of the folded expression into [[Some]] (to distinguish it from 
the case when partition-column
     //       isn't bound to any value by the predicate)
     predicates.flatMap {
-      case EqualTo(attr: AttributeReference, e: Expression) if e.foldable =>
-        Seq((attr.name, Some(e.eval(EmptyRow))))
-      case EqualTo(e: Expression, attr: AttributeReference) if e.foldable =>
-        Seq((attr.name, Some(e.eval(EmptyRow))))
-
+      case EqualTo(attr: AttributeReference, e @ 
ExtractableLiteral(valueString)) =>
+        Seq((attr.name, (valueString, Some(e.eval(EmptyRow)))))
+      case EqualTo(e @ ExtractableLiteral(valueString), attr: 
AttributeReference) =>
+        Seq((attr.name, (valueString, Some(e.eval(EmptyRow)))))
       case _ => Seq.empty
     }.toMap
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala
new file mode 100644
index 00000000000..0467b6664c6
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+class TestLazyPartitionPathFetching extends HoodieSparkSqlTestBase {
+
+  test("Test querying with string column + partition pruning") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long,
+           |  date_par date
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  preCombineField = 'ts'
+           | )
+           | PARTITIONED BY (date_par)
+       """.stripMargin)
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, date 
'2023-02-27')")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, date 
'2023-02-28')")
+      spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, date 
'2023-03-01')")
+
+      checkAnswer(s"select id, name, price, ts from $tableName where 
date_par='2023-03-01' order by id")(
+        Seq(3, "a3", 10.0, 1000)
+      )
+
+      withSQLConf("spark.sql.session.timeZone" -> "UTC+2") {
+        checkAnswer(s"select id, name, price, ts from $tableName where 
date_par='2023-03-01' order by id")(
+          Seq(3, "a3", 10.0, 1000)
+        )
+      }
+    }
+  }
+
+  test("Test querying with date column + partition pruning (multi-level 
partitioning)") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long,
+           |  country string,
+           |  date_par date
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  preCombineField = 'ts'
+           | )
+           | PARTITIONED BY (country, date_par)
+         """.stripMargin)
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 'ID', date 
'2023-02-27')")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, 'ID', date 
'2023-02-28')")
+      spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, 'ID', date 
'2023-03-01')")
+
+      // for lazy fetching partition path & file slice to be enabled, filter 
must be applied on all partitions
+      checkAnswer(s"select id, name, price, ts from $tableName " +
+        s"where date_par='2023-03-01' and country='ID' order by id")(
+        Seq(3, "a3", 10.0, 1000)
+      )
+
+      withSQLConf("spark.sql.session.timeZone" -> "UTC+2") {
+        checkAnswer(s"select id, name, price, ts from $tableName " +
+          s"where date_par='2023-03-01' and country='ID' order by id")(
+          Seq(3, "a3", 10.0, 1000)
+        )
+      }
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index dd72282fec6..545419b148a 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, InterpretedPredicate}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.util.DateFormatter
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark24HoodieParquetFileFormat}
 import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
@@ -45,7 +46,11 @@ import org.apache.spark.sql.types.{DataType, Metadata, 
MetadataBuilder, StructTy
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel._
 
+import java.time.ZoneId
+import java.util.TimeZone
+import java.util.concurrent.ConcurrentHashMap
 import scala.collection.JavaConverters.mapAsScalaMapConverter
+import scala.collection.convert.Wrappers.JConcurrentMapWrapper
 import scala.collection.mutable.ArrayBuffer
 
 /**
@@ -64,6 +69,9 @@ class Spark2Adapter extends SparkAdapter {
     //       we simply produce an empty [[Metadata]] instance
     new MetadataBuilder().build()
 
+  private val cache = JConcurrentMapWrapper(
+    new ConcurrentHashMap[ZoneId, DateFormatter](1))
+
   override def getCatalogUtils: HoodieCatalogUtils = {
     throw new UnsupportedOperationException("Catalog utilities are not 
supported in Spark 2.x");
   }
@@ -90,6 +98,10 @@ class Spark2Adapter extends SparkAdapter {
 
   override def getSparkParsePartitionUtil: SparkParsePartitionUtil = 
Spark2ParsePartitionUtil
 
+  override def getDateFormatter(tz: TimeZone): DateFormatter = {
+    cache.getOrElseUpdate(tz.toZoneId, DateFormatter())
+  }
+
   /**
    * Combine [[PartitionedFile]] to [[FilePartition]] according to 
`maxSplitBytes`.
    *
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index 022a1d247e1..3880bda454d 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -21,6 +21,7 @@ import org.apache.avro.Schema
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.spark3.internal.ReflectUtil
 import org.apache.hudi.{AvroConversionUtils, DefaultSource, Spark3RowSerDe}
 import org.apache.hudi.{AvroConversionUtils, DefaultSource, 
HoodieBaseRelation, Spark3RowSerDe}
 import org.apache.spark.internal.Logging
@@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
InterpretedPredicate, Predicate}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util.DateFormatter
 import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -42,13 +44,20 @@ import org.apache.spark.sql.{HoodieSpark3CatalogUtils, 
SQLContext, SparkSession}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel._
 
+import java.time.ZoneId
+import java.util.TimeZone
+import java.util.concurrent.ConcurrentHashMap
 import scala.collection.JavaConverters.mapAsScalaMapConverter
+import scala.collection.convert.Wrappers.JConcurrentMapWrapper
 
 /**
  * Base implementation of [[SparkAdapter]] for Spark 3.x branch
  */
 abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
 
+  private val cache = JConcurrentMapWrapper(
+    new ConcurrentHashMap[ZoneId, DateFormatter](1))
+
   def getCatalogUtils: HoodieSpark3CatalogUtils
 
   override def createSparkRowSerDe(schema: StructType): SparkRowSerDe = {
@@ -74,6 +83,10 @@ abstract class BaseSpark3Adapter extends SparkAdapter with 
Logging {
 
   override def getSparkParsePartitionUtil: SparkParsePartitionUtil = 
Spark3ParsePartitionUtil
 
+  override def getDateFormatter(tz: TimeZone): DateFormatter = {
+    cache.getOrElseUpdate(tz.toZoneId, 
ReflectUtil.getDateFormatter(tz.toZoneId))
+  }
+
   /**
    * Combine [[PartitionedFile]] to [[FilePartition]] according to 
`maxSplitBytes`.
    */

Reply via email to