This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ae1ee05ab8c [HUDI-7709] ClassCastException while reading the data
using `TimestampBasedKeyGenerator` (#11501)
ae1ee05ab8c is described below
commit ae1ee05ab8c2bd732e57bee11c8748926b05ec4b
Author: Geser Dugarov <[email protected]>
AuthorDate: Wed Jun 26 16:17:18 2024 +0700
[HUDI-7709] ClassCastException while reading the data using
`TimestampBasedKeyGenerator` (#11501)
---
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 24 +++-
.../hudi/common/table/HoodieTableConfig.java | 2 +
.../main/scala/org/apache/hudi/DefaultSource.scala | 3 -
.../TestSparkSqlWithTimestampKeyGenerator.scala | 147 +++++++++++++++++++++
4 files changed, 167 insertions(+), 9 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 636eb1faa40..bfa7cec717a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -19,6 +19,7 @@
package org.apache.hudi;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TimestampKeyGeneratorConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
@@ -26,6 +27,7 @@ import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableQueryType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -40,6 +42,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.expression.Expression;
import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.storage.HoodieStorage;
@@ -360,13 +363,22 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
}
private Object[] parsePartitionColumnValues(String[] partitionColumns,
String partitionPath) {
- Object[] partitionColumnValues =
doParsePartitionColumnValues(partitionColumns, partitionPath);
- if (shouldListLazily && partitionColumnValues.length !=
partitionColumns.length) {
- throw new HoodieException("Failed to parse partition column values from
the partition-path:"
- + " likely non-encoded slashes being used in partition column's
values. You can try to"
- + " work this around by switching listing mode to eager");
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
+ Object[] partitionColumnValues;
+ if (null != tableConfig.getKeyGeneratorClassName()
+ &&
tableConfig.getKeyGeneratorClassName().equals(KeyGeneratorType.TIMESTAMP.getClassName())
+ &&
tableConfig.propsMap().get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key()).matches("SCALAR|UNIX_TIMESTAMP|EPOCHMILLISECONDS"))
{
+ // For TIMESTAMP key generator when TYPE is SCALAR, UNIX_TIMESTAMP or
EPOCHMILLISECONDS,
+ // we couldn't reconstruct initial partition column values from
partition paths due to lost data after formatting in most cases
+ partitionColumnValues = new Object[partitionColumns.length];
+ } else {
+ partitionColumnValues = doParsePartitionColumnValues(partitionColumns,
partitionPath);
+ if (shouldListLazily && partitionColumnValues.length !=
partitionColumns.length) {
+ throw new HoodieException("Failed to parse partition column values
from the partition-path:"
+ + " likely non-encoded slashes being used in partition column's
values. You can try to"
+ + " work this around by switching listing mode to eager");
+ }
}
-
return partitionColumnValues;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 117b64ba29d..6053278d831 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -76,6 +76,7 @@ import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAM
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_DATE_FORMAT;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_TIMEZONE_FORMAT;
+import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD;
import static org.apache.hudi.common.util.ConfigUtils.fetchConfigs;
import static org.apache.hudi.common.util.ConfigUtils.recoverIfNeeded;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
@@ -284,6 +285,7 @@ public class HoodieTableConfig extends HoodieConfig {
public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_ENABLE =
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
public static final List<ConfigProperty<String>> PERSISTED_CONFIG_LIST =
Arrays.asList(
+ TIMESTAMP_TYPE_FIELD,
INPUT_TIME_UNIT,
TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX,
TIMESTAMP_INPUT_DATE_FORMAT,
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 246f20edda0..1593356b1e8 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -243,9 +243,6 @@ object DefaultSource {
val queryType = parameters(QUERY_TYPE.key)
val isCdcQuery = queryType == QUERY_TYPE_INCREMENTAL_OPT_VAL &&
parameters.get(INCREMENTAL_FORMAT.key).contains(INCREMENTAL_FORMAT_CDC_VAL)
- val isMultipleBaseFileFormatsEnabled =
metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled
-
-
val createTimeLineRln =
parameters.get(DataSourceReadOptions.CREATE_TIMELINE_RELATION.key())
val createFSRln =
parameters.get(DataSourceReadOptions.CREATE_FILESYSTEM_RELATION.key())
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
new file mode 100644
index 00000000000..92c3dac6832
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.functional.TestSparkSqlWithTimestampKeyGenerator._
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import org.slf4j.LoggerFactory
+
+/**
+ * Tests of timestamp key generator using Spark SQL
+ */
+class TestSparkSqlWithTimestampKeyGenerator extends HoodieSparkSqlTestBase {
+ private val LOG = LoggerFactory.getLogger(getClass)
+
+ test("Test Spark SQL with timestamp key generator") {
+ withTempDir { tmp =>
+ Seq(
+ Seq("COPY_ON_WRITE", "true"),
+ Seq("COPY_ON_WRITE", "false"),
+ Seq("MERGE_ON_READ", "true"),
+ Seq("MERGE_ON_READ", "false")
+ ).foreach { testParams =>
+ val tableType = testParams(0)
+ // enables use of engine agnostic file group reader
+ val shouldUseFileGroupReader = testParams(1)
+
+ timestampKeyGeneratorSettings.foreach { keyGeneratorSettings =>
+ withTable(generateTableName) { tableName =>
+ // Warning level is used due to CI run with warn-log profile for
quick failed cases identification
+ LOG.warn(s"Table '${tableName}' with parameters: ${testParams}.
Timestamp key generator settings: ${keyGeneratorSettings}")
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
+ val tsType = if (keyGeneratorSettings.contains("DATE_STRING"))
"string" else "long"
+ spark.sql(
+ s"""
+ | CREATE TABLE $tableName (
+ | id int,
+ | name string,
+ | precomb long,
+ | ts ${tsType}
+ | ) USING HUDI
+ | PARTITIONED BY (ts)
+ | LOCATION '${tablePath}'
+ | TBLPROPERTIES (
+ | type = '${tableType}',
+ | primaryKey = 'id',
+ | preCombineField = 'precomb',
+ | hoodie.datasource.write.partitionpath.field = 'ts',
+ | hoodie.datasource.write.hive_style_partitioning = 'false',
+ | hoodie.file.group.reader.enabled =
'${shouldUseFileGroupReader}',
+ | hoodie.table.keygenerator.class =
'org.apache.hudi.keygen.TimestampBasedKeyGenerator',
+ | ${keyGeneratorSettings}
+ | )
+ |""".stripMargin)
+ // TODO: couldn't set `TIMESTAMP` for
`hoodie.table.keygenerator.type`, it's overwritten by `SIMPLE`, only
`hoodie.table.keygenerator.class` works
+
+ val (dataBatches, expectedQueryResult) = if
(keyGeneratorSettings.contains("DATE_STRING"))
+ (dataBatchesWithString, queryResultWithString)
+ else if (keyGeneratorSettings.contains("EPOCHMILLISECONDS"))
+ (dataBatchesWithLongOfMilliseconds,
queryResultWithLongOfMilliseconds)
+ else // UNIX_TIMESTAMP, and SCALAR with SECONDS
+ (dataBatchesWithLongOfSeconds, queryResultWithLongOfSeconds)
+
+ withSQLConf("hoodie.file.group.reader.enabled" ->
s"${shouldUseFileGroupReader}",
+ "hoodie.datasource.query.type" -> "snapshot") {
+ // two partitions, one contains parquet file only, the second
one contains parquet and log files for MOR, and two parquets for COW
+ spark.sql(s"INSERT INTO ${tableName} VALUES ${dataBatches(0)}")
+ spark.sql(s"INSERT INTO ${tableName} VALUES ${dataBatches(1)}")
+
+ val queryResult = spark.sql(s"SELECT id, name, precomb, ts FROM
${tableName} ORDER BY id").collect().mkString("; ")
+ LOG.warn(s"Query result: ${queryResult}")
+ if (!keyGeneratorSettings.contains("DATE_STRING"))
+ // TODO: use `shouldExtractPartitionValuesFromPartitionPath`
uniformly, and get `expectedQueryResult` for all cases instead of
`expectedQueryResultWithNull` for some cases
+ // Fix for [HUDI-3896] overwrites
`shouldExtractPartitionValuesFromPartitionPath` in `BaseFileOnlyRelation`,
therefore for COW we extracting from partition paths and get nulls
+ // [HUDI-7925] Currently there is no logic for
`shouldExtractPartitionValuesFromPartitionPath` in
`HoodieBaseHadoopFsRelationFactory` (used when shouldUseFileGroupReader = true)
+ if (tableType == "COPY_ON_WRITE" ||
shouldUseFileGroupReader.toBoolean)
+ assertResult(expectedQueryResultWithNull)(queryResult)
+ else
+ assertResult(expectedQueryResult)(queryResult)
+ else {
+ // for DATE_STRING type values are reconstructed from
partition path even loosing data
+ if (!(tableType == "COPY_ON_WRITE" ||
shouldUseFileGroupReader.toBoolean))
+ assertResult(expectedQueryResult)(queryResult)
+ else
+ assertResult(expectedQueryResultWithLossyString)(queryResult)
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+object TestSparkSqlWithTimestampKeyGenerator {
+ val outputDateformat = "yyyy-MM-dd HH"
+ val timestampKeyGeneratorSettings: Array[String] = Array(
+ s"""
+ | hoodie.keygen.timebased.timestamp.type = 'UNIX_TIMESTAMP',
+ | hoodie.keygen.timebased.output.dateformat =
'${outputDateformat}'""",
+ s"""
+ | hoodie.keygen.timebased.timestamp.type = 'EPOCHMILLISECONDS',
+ | hoodie.keygen.timebased.output.dateformat =
'${outputDateformat}'""",
+ s"""
+ | hoodie.keygen.timebased.timestamp.type = 'SCALAR',
+ | hoodie.keygen.timebased.timestamp.scalar.time.unit = 'SECONDS',
+ | hoodie.keygen.timebased.output.dateformat =
'${outputDateformat}'""",
+ s"""
+ | hoodie.keygen.timebased.timestamp.type = 'DATE_STRING',
+ | hoodie.keygen.timebased.input.dateformat = 'yyyy-MM-dd HH:mm:ss',
+ | hoodie.keygen.timebased.output.dateformat = '${outputDateformat}'"""
+ )
+
+ // All data batches should correspond to 2004-02-29 01:02:03 and 2024-06-21
06:50:03
+ val dataBatchesWithLongOfSeconds: Array[String] = Array(
+ "(1, 'a1', 1, 1078016523), (2, 'a2', 1, 1718952603)",
+ "(2, 'a3', 1, 1718952603)"
+ )
+ val dataBatchesWithLongOfMilliseconds: Array[String] = Array(
+ "(1, 'a1', 1, 1078016523000), (2, 'a2', 1, 1718952603000)",
+ "(2, 'a3', 1, 1718952603000)"
+ )
+ val dataBatchesWithString: Array[String] = Array(
+ "(1, 'a1', 1, '2004-02-29 01:02:03'), (2, 'a2', 1, '2024-06-21 06:50:03')",
+ "(2, 'a3', 1, '2024-06-21 06:50:03')"
+ )
+ val queryResultWithLongOfSeconds: String = "[1,a1,1,1078016523];
[2,a3,1,1718952603]"
+ val queryResultWithLongOfMilliseconds: String = "[1,a1,1,1078016523000];
[2,a3,1,1718952603000]"
+ val queryResultWithString: String = "[1,a1,1,2004-02-29 01:02:03];
[2,a3,1,2024-06-21 06:50:03]"
+ val expectedQueryResultWithLossyString: String = "[1,a1,1,2004-02-29 01];
[2,a3,1,2024-06-21 06]"
+ val expectedQueryResultWithNull: String = "[1,a1,1,null]; [2,a3,1,null]"
+}