This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e2860cddf54 [HUDI-7709] Pass partition paths as partition column 
values if `TimestampBasedKeyGenerator` is used (#11615)
e2860cddf54 is described below

commit e2860cddf5494578c64b0c3eeb2dc0160154ceda
Author: Geser Dugarov <[email protected]>
AuthorDate: Fri Jul 12 16:30:30 2024 +0700

    [HUDI-7709] Pass partition paths as partition column values if 
`TimestampBasedKeyGenerator` is used (#11615)
    
    * [HUDI-7709] Pass partition paths as partition column values if 
`TimestampBasedKeyGenerator` is used
    
    Fix of ClassCastException while reading by Spark.
    Previous fix ae1ee05ab8c2bd732e57bee11c8748926b05ec4b has been reverted by 
26ac119ee25f03ff079bb396b5f397ee1264c406.
    
    * Added check of mandatory partitioning
---
 .../hudi/common/table/HoodieTableConfig.java       |   2 +
 .../main/scala/org/apache/hudi/DefaultSource.scala |   2 -
 .../apache/hudi/SparkHoodieTableFileIndex.scala    |  24 ++-
 .../TestSparkSqlWithTimestampKeyGenerator.scala    | 167 +++++++++++++++++++++
 4 files changed, 186 insertions(+), 9 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 117b64ba29d..6053278d831 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -76,6 +76,7 @@ import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAM
 import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_DATE_FORMAT;
 import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT;
 import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_TIMEZONE_FORMAT;
+import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD;
 import static org.apache.hudi.common.util.ConfigUtils.fetchConfigs;
 import static org.apache.hudi.common.util.ConfigUtils.recoverIfNeeded;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
@@ -284,6 +285,7 @@ public class HoodieTableConfig extends HoodieConfig {
   public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_ENABLE = 
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
 
   public static final List<ConfigProperty<String>> PERSISTED_CONFIG_LIST = 
Arrays.asList(
+      TIMESTAMP_TYPE_FIELD,
       INPUT_TIME_UNIT,
       TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX,
       TIMESTAMP_INPUT_DATE_FORMAT,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 246f20edda0..bcf12613b80 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -243,8 +243,6 @@ object DefaultSource {
     val queryType = parameters(QUERY_TYPE.key)
     val isCdcQuery = queryType == QUERY_TYPE_INCREMENTAL_OPT_VAL &&
       
parameters.get(INCREMENTAL_FORMAT.key).contains(INCREMENTAL_FORMAT_CDC_VAL)
-    val isMultipleBaseFileFormatsEnabled = 
metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled
-
 
     val createTimeLineRln = 
parameters.get(DataSourceReadOptions.CREATE_TIMELINE_RELATION.key())
     val createFSRln = 
parameters.get(DataSourceReadOptions.CREATE_FILESYSTEM_RELATION.key())
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index c5581f116be..d070898899a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, 
extractEqualityPredicatesLiteralValues, generateFieldMap, 
haveProperPartitionValues, shouldListLazily, 
shouldUsePartitionPathPrefixAnalysis, shouldValidatePartitionColumns}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.config.{TimestampKeyGeneratorConfig, 
TypedProperties}
 import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
 import 
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
@@ -32,23 +32,23 @@ import 
org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.internal.schema.Types.RecordType
 import org.apache.hudi.internal.schema.utils.Conversions
+import org.apache.hudi.keygen.constant.KeyGeneratorType
 import org.apache.hudi.keygen.{StringPartitionPathFormatter, 
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
 import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
 import org.apache.hudi.util.JFunction
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.{expressions, InternalRow}
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, EmptyRow, EqualTo, Expression, InterpretedPredicate, Literal}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{ByteType, DateType, IntegerType, LongType, 
ShortType, StringType, StructField, StructType}
+import org.apache.spark.unsafe.types.UTF8String
 
 import javax.annotation.concurrent.NotThreadSafe
-
 import java.util.Collections
-
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 import scala.util.{Success, Try}
@@ -400,9 +400,19 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
   }
 
   protected def doParsePartitionColumnValues(partitionColumns: Array[String], 
partitionPath: String): Array[Object] = {
-    HoodieSparkUtils.parsePartitionColumnValues(partitionColumns, 
partitionPath, getBasePath, schema,
-      configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, 
SQLConf.get.sessionLocalTimeZone),
-      sparkParsePartitionUtil, shouldValidatePartitionColumns(spark))
+    val tableConfig = metaClient.getTableConfig
+    if (null != tableConfig.getKeyGeneratorClassName
+      && 
tableConfig.getKeyGeneratorClassName.equals(KeyGeneratorType.TIMESTAMP.getClassName)
+      && 
tableConfig.propsMap.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key()).matches("SCALAR|UNIX_TIMESTAMP|EPOCHMILLISECONDS"))
 {
+      // For TIMESTAMP key generator when TYPE is SCALAR, UNIX_TIMESTAMP or 
EPOCHMILLISECONDS,
+      // we couldn't reconstruct initial partition column values from 
partition paths due to lost data after formatting in most cases.
+      // But the output for these cases is in a string format, so we can pass 
partitionPath as UTF8String
+      Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath))
+    } else {
+      HoodieSparkUtils.parsePartitionColumnValues(partitionColumns, 
partitionPath, getBasePath, schema,
+        configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, 
SQLConf.get.sessionLocalTimeZone),
+        sparkParsePartitionUtil, shouldValidatePartitionColumns(spark))
+    }
   }
 
   private def arePartitionPathsUrlEncoded: Boolean =
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
new file mode 100644
index 00000000000..cf95b5c42d4
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.functional.TestSparkSqlWithTimestampKeyGenerator._
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import org.slf4j.LoggerFactory
+
+/**
+ * Tests of timestamp key generator using Spark SQL
+ */
+class TestSparkSqlWithTimestampKeyGenerator extends HoodieSparkSqlTestBase {
+  private val LOG = LoggerFactory.getLogger(getClass)
+
+  test("Test Spark SQL with timestamp key generator") {
+    withTempDir { tmp =>
+      Seq(
+        Seq("COPY_ON_WRITE", "true"),
+        Seq("COPY_ON_WRITE", "false"),
+        Seq("MERGE_ON_READ", "true"),
+        Seq("MERGE_ON_READ", "false")
+      ).foreach { testParams =>
+        val tableType = testParams(0)
+        // enables use of engine agnostic file group reader
+        val shouldUseFileGroupReader = testParams(1)
+
+        timestampKeyGeneratorSettings.foreach { keyGeneratorSettings =>
+          withTable(generateTableName) { tableName =>
+            // Warning level is used due to CI run with warn-log profile for 
quick failed cases identification
+            LOG.warn(s"Table '${tableName}' with parameters: ${testParams}. 
Timestamp key generator settings: ${keyGeneratorSettings}")
+            val tablePath = tmp.getCanonicalPath + "/" + tableName
+            val tsType = if (keyGeneratorSettings.contains("DATE_STRING")) 
"string" else "long"
+            spark.sql(
+              s"""
+                 | CREATE TABLE $tableName (
+                 |   id int,
+                 |   name string,
+                 |   precomb long,
+                 |   ts ${tsType}
+                 | ) USING HUDI
+                 | PARTITIONED BY (ts)
+                 | LOCATION '${tablePath}'
+                 | TBLPROPERTIES (
+                 |   type = '${tableType}',
+                 |   primaryKey = 'id',
+                 |   preCombineField = 'precomb',
+                 |   hoodie.datasource.write.partitionpath.field = 'ts',
+                 |   hoodie.datasource.write.hive_style_partitioning = 'false',
+                 |   hoodie.file.group.reader.enabled = 
'${shouldUseFileGroupReader}',
+                 |   hoodie.table.keygenerator.class = 
'org.apache.hudi.keygen.TimestampBasedKeyGenerator',
+                 |   ${keyGeneratorSettings}
+                 | )
+                 |""".stripMargin)
+            // TODO: couldn't set `TIMESTAMP` for 
`hoodie.table.keygenerator.type`, it's overwritten by `SIMPLE`, only 
`hoodie.table.keygenerator.class` works
+
+            val (dataBatches, expectedQueryResult) = if 
(keyGeneratorSettings.contains("DATE_STRING"))
+              (dataBatchesWithString, queryResultWithString)
+            else if (keyGeneratorSettings.contains("EPOCHMILLISECONDS"))
+              (dataBatchesWithLongOfMilliseconds, 
queryResultWithLongOfMilliseconds)
+            else // UNIX_TIMESTAMP, and SCALAR with SECONDS
+              (dataBatchesWithLongOfSeconds, queryResultWithLongOfSeconds)
+
+            withSQLConf("hoodie.file.group.reader.enabled" -> 
s"${shouldUseFileGroupReader}",
+              "hoodie.datasource.query.type" -> "snapshot") {
+              // two partitions, one contains parquet file only, the second 
one contains parquet and log files for MOR, and two parquets for COW
+              spark.sql(s"INSERT INTO ${tableName} VALUES ${dataBatches(0)}")
+              spark.sql(s"INSERT INTO ${tableName} VALUES ${dataBatches(1)}")
+
+              val queryResult = spark.sql(s"SELECT id, name, precomb, ts FROM 
${tableName} ORDER BY id").collect().mkString("; ")
+              LOG.warn(s"Query result: ${queryResult}")
+              // TODO: use `shouldExtractPartitionValuesFromPartitionPath` 
uniformly, and get `expectedQueryResult` for all cases instead of 
`expectedQueryResultWithLossyString` for some cases
+              //   After it we could properly process filters like "WHERE ts 
BETWEEN 1078016000 and 1718953003" and add tests with partition pruning.
+              //   COW: Fix for [HUDI-3896] overwrites 
`shouldExtractPartitionValuesFromPartitionPath` in `BaseFileOnlyRelation`, 
therefore for COW we extracting from partition paths and get nulls
+              //   shouldUseFileGroupReader: [HUDI-7925] Currently there is no 
logic for `shouldExtractPartitionValuesFromPartitionPath` in 
`HoodieBaseHadoopFsRelationFactory`
+              if (tableType == "COPY_ON_WRITE" || 
shouldUseFileGroupReader.toBoolean)
+                assertResult(expectedQueryResultWithLossyString)(queryResult)
+              else
+                assertResult(expectedQueryResult)(queryResult)
+            }
+          }
+        }
+      }
+    }
+  }
+
+  test("Test mandatory partitioning for timestamp key generator") {
+    withTempDir { tmp =>
+      spark.sql(
+        s"""
+           | CREATE TABLE should_fail (
+           |   id int,
+           |   name string,
+           |   precomb long,
+           |   ts long
+           | ) USING HUDI
+           | LOCATION '${tmp.getCanonicalPath + "/should_fail"}'
+           | TBLPROPERTIES (
+           |   type = 'COPY_ON_WRITE',
+           |   primaryKey = 'id',
+           |   preCombineField = 'precomb',
+           |   hoodie.table.keygenerator.class = 
'org.apache.hudi.keygen.TimestampBasedKeyGenerator',
+           |   ${timestampKeyGeneratorSettings.head}
+           | )
+           |""".stripMargin)
+      // should fail due to absent partitioning
+      assertThrows[HoodieException] {
+        spark.sql(s"INSERT INTO should_fail VALUES 
${dataBatchesWithLongOfSeconds(0)}")
+      }
+
+    }
+  }
+}
+
+object TestSparkSqlWithTimestampKeyGenerator {
+  val outputDateformat = "yyyy-MM-dd HH"
+  val timestampKeyGeneratorSettings: Array[String] = Array(
+    s"""
+       |   hoodie.keygen.timebased.timestamp.type = 'UNIX_TIMESTAMP',
+       |   hoodie.keygen.timebased.output.dateformat = 
'${outputDateformat}'""",
+    s"""
+       |   hoodie.keygen.timebased.timestamp.type = 'EPOCHMILLISECONDS',
+       |   hoodie.keygen.timebased.output.dateformat = 
'${outputDateformat}'""",
+    s"""
+       |   hoodie.keygen.timebased.timestamp.type = 'SCALAR',
+       |   hoodie.keygen.timebased.timestamp.scalar.time.unit = 'SECONDS',
+       |   hoodie.keygen.timebased.output.dateformat = 
'${outputDateformat}'""",
+    s"""
+       |   hoodie.keygen.timebased.timestamp.type = 'DATE_STRING',
+       |   hoodie.keygen.timebased.input.dateformat = 'yyyy-MM-dd HH:mm:ss',
+       |   hoodie.keygen.timebased.output.dateformat = '${outputDateformat}'"""
+  )
+
+  // All data batches should correspond to 2004-02-29 01:02:03 and 2024-06-21 
06:50:03
+  val dataBatchesWithLongOfSeconds: Array[String] = Array(
+    "(1, 'a1', 1, 1078016523), (2, 'a2', 1, 1718952603)",
+    "(2, 'a3', 1, 1718952603)"
+  )
+  val dataBatchesWithLongOfMilliseconds: Array[String] = Array(
+    "(1, 'a1', 1, 1078016523000), (2, 'a2', 1, 1718952603000)",
+    "(2, 'a3', 1, 1718952603000)"
+  )
+  val dataBatchesWithString: Array[String] = Array(
+    "(1, 'a1', 1, '2004-02-29 01:02:03'), (2, 'a2', 1, '2024-06-21 06:50:03')",
+    "(2, 'a3', 1, '2024-06-21 06:50:03')"
+  )
+  val queryResultWithLongOfSeconds: String = "[1,a1,1,1078016523]; 
[2,a3,1,1718952603]"
+  val queryResultWithLongOfMilliseconds: String = "[1,a1,1,1078016523000]; 
[2,a3,1,1718952603000]"
+  val queryResultWithString: String = "[1,a1,1,2004-02-29 01:02:03]; 
[2,a3,1,2024-06-21 06:50:03]"
+  val expectedQueryResultWithLossyString: String = "[1,a1,1,2004-02-29 01]; 
[2,a3,1,2024-06-21 06]"
+}

Reply via email to