This is an automated email from the ASF dual-hosted git repository.
vhs pushed a commit to branch release-1.0.2
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-1.0.2 by this push:
new 9a02bdb2df0 [HUDI-8902] Fix the incorrect data read after changing the
column type from float to double (#13188)
9a02bdb2df0 is described below
commit 9a02bdb2df04087df95fa43cb00899dcd1c632b5
Author: TheR1sing3un <[email protected]>
AuthorDate: Mon Apr 21 18:44:31 2025 +0800
[HUDI-8902] Fix the incorrect data read after changing the column type from
float to double (#13188)
Fix the incorrect data read after changing the column type from float to
double
There are two reasons for the above problems:
1. `InternalSchemaCache` cannot obtain the path of the commit file
correctly.
Because after version 1.x, we placed the commit file separately in the
`timeline` directory.
However, `InternalSchemaCache` still retrievals files from
`basePath/.hoodie`, thus failing to obtain the correct schema.
2. When we read parquet file without `enableVectorizedReader`, we use spark
's `CAST` for type conversion when we need to do scheme evolution,
but spark has a loss of precision when dealing with `CAST` that
converts `float` to `double`.
Change logs:
1. pass the correct timeline path for `InternalSchemaCache`
2. for schema evolution without `enableVectorizedReader`, for the case
`CAST(FLOAT as DOUBLE)` we turn it to `CAST(CAST(FLOAT as STRING) as DOUBLE)`.
Signed-off-by: TheR1sing3un <[email protected]>
(cherry picked from commit e43841f754b9aaa263583c13396a22dda48b53fc)
---
.../parquet/HoodieParquetFileFormatHelper.scala | 12 ++++++--
.../hudi/common/util/InternalSchemaCache.java | 25 ++++++++++++-----
.../hudi/table/format/InternalSchemaManager.java | 12 ++++----
.../apache/spark/sql/hudi/ddl/TestSpark3DDL.scala | 32 ++++++++--------------
.../Spark3ParquetSchemaEvolutionUtils.scala | 2 +-
.../Spark33LegacyHoodieParquetFileFormat.scala | 2 +-
.../Spark34LegacyHoodieParquetFileFormat.scala | 2 +-
.../Spark35LegacyHoodieParquetFileFormat.scala | 2 +-
8 files changed, 50 insertions(+), 39 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
index f69b04a91ba..48efe56ed22 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
@@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.metadata.FileMetaData
import org.apache.spark.sql.HoodieSchemaUtils
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast,
UnsafeProjection}
-import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField,
StructType}
+import org.apache.spark.sql.types.{ArrayType, DataType, DoubleType, FloatType,
MapType, StringType, StructField, StructType}
object HoodieParquetFileFormatHelper {
@@ -120,7 +120,15 @@ object HoodieParquetFileFormatHelper {
val srcType = typeChangeInfos.get(i).getRight
val dstType = typeChangeInfos.get(i).getLeft
val needTimeZone = Cast.needsTimeZone(srcType, dstType)
- Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
+
+ // work around for the case when cast float to double
+ if (srcType == FloatType && dstType == DoubleType) {
+ // first cast to string and then to double
+ val toStringAttr = Cast(attr, StringType, if (needTimeZone)
timeZoneId else None)
+ Cast(toStringAttr, dstType, if (needTimeZone) timeZoneId else None)
+ } else {
+ Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
+ }
} else attr
}
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
index ef046389e19..4ac5b65a96f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.util;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -27,6 +28,8 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
import org.apache.hudi.common.table.timeline.InstantFileNameParser;
import org.apache.hudi.common.table.timeline.InstantGenerator;
+import org.apache.hudi.common.table.timeline.TimelineLayout;
+import org.apache.hudi.common.table.timeline.TimelinePathProvider;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.schema.InternalSchema;
@@ -176,23 +179,27 @@ public class InternalSchemaCache {
* @param tablePath table path
* @param storage {@link HoodieStorage} instance.
* @param validCommits current validate commits, use to make up the
commit file path/verify the validity of the history schema files
- * @param fileNameParser InstantFileNameParser
- * @param commitMetadataSerDe CommitMetadataSerDe
- * @param instantGenerator InstantGenerator
+ * @param timelineLayout {@link TimelineLayout} instance, used to get
{@link InstantFileNameParser}/{@link CommitMetadataSerDe}/{@link
InstantGenerator}/{@link TimelinePathProvider} instance.
+ * @param tableConfig {@link HoodieTableConfig} instance, used to
get the timeline path.
* @return a internalSchema.
*/
public static InternalSchema getInternalSchemaByVersionId(long versionId,
String tablePath, HoodieStorage storage, String validCommits,
-
InstantFileNameParser fileNameParser, CommitMetadataSerDe commitMetadataSerDe,
InstantGenerator instantGenerator) {
+ TimelineLayout
timelineLayout, HoodieTableConfig tableConfig) {
+ InstantFileNameParser fileNameParser =
timelineLayout.getInstantFileNameParser();
+ CommitMetadataSerDe commitMetadataSerDe =
timelineLayout.getCommitMetadataSerDe();
+ InstantGenerator instantGenerator = timelineLayout.getInstantGenerator();
+ TimelinePathProvider timelinePathProvider =
timelineLayout.getTimelinePathProvider();
+ StoragePath timelinePath =
timelinePathProvider.getTimelinePath(tableConfig, new StoragePath(tablePath));
+
String avroSchema = "";
Set<String> commitSet =
Arrays.stream(validCommits.split(",")).collect(Collectors.toSet());
List<String> validateCommitList =
commitSet.stream().map(fileNameParser::extractTimestamp).collect(Collectors.toList());
- StoragePath hoodieMetaPath = new StoragePath(tablePath,
HoodieTableMetaClient.METAFOLDER_NAME);
//step1:
StoragePath candidateCommitFile = commitSet.stream()
.filter(fileName ->
fileNameParser.extractTimestamp(fileName).equals(versionId + ""))
- .findFirst().map(f -> new StoragePath(hoodieMetaPath, f)).orElse(null);
+ .findFirst().map(f -> new StoragePath(timelinePath, f)).orElse(null);
if (candidateCommitFile != null) {
try {
HoodieCommitMetadata metadata;
@@ -231,12 +238,16 @@ public class InternalSchemaCache {
: fileSchema;
}
+ public static InternalSchema getInternalSchemaByVersionId(long versionId,
String tablePath, HoodieStorage storage, String validCommits, TimelineLayout
timelineLayout) {
+ return getInternalSchemaByVersionId(versionId, tablePath, storage,
validCommits, timelineLayout, HoodieTableConfig.loadFromHoodieProps(storage,
tablePath));
+ }
+
public static InternalSchema getInternalSchemaByVersionId(long versionId,
HoodieTableMetaClient metaClient) {
InstantFileNameGenerator factory =
metaClient.getInstantFileNameGenerator();
String validCommitLists = metaClient
.getCommitsAndCompactionTimeline().filterCompletedInstants().getInstantsAsStream().map(factory::getFileName).collect(Collectors.joining(","));
return getInternalSchemaByVersionId(versionId,
metaClient.getBasePath().toString(), metaClient.getStorage(),
- validCommitLists, metaClient.getInstantFileNameParser(),
metaClient.getCommitMetadataSerDe(), metaClient.getInstantGenerator());
+ validCommitLists, metaClient.getTimelineLayout(),
metaClient.getTableConfig());
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
index cdc489e938c..35d0b49cf1a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
@@ -19,6 +19,7 @@
package org.apache.hudi.table.format;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
@@ -61,13 +62,14 @@ public class InternalSchemaManager implements Serializable {
private static final long serialVersionUID = 1L;
public static final InternalSchemaManager DISABLED = new
InternalSchemaManager(null, InternalSchema.getEmptyInternalSchema(), null, null,
- TimelineLayout.fromVersion(TimelineLayoutVersion.CURR_LAYOUT_VERSION));
+ TimelineLayout.fromVersion(TimelineLayoutVersion.CURR_LAYOUT_VERSION),
null);
private final Configuration conf;
private final InternalSchema querySchema;
private final String validCommits;
private final String tablePath;
private final TimelineLayout layout;
+ private final HoodieTableConfig tableConfig;
private transient org.apache.hadoop.conf.Configuration hadoopConf;
public static InternalSchemaManager get(Configuration conf,
HoodieTableMetaClient metaClient) {
@@ -86,16 +88,17 @@ public class InternalSchemaManager implements Serializable {
.getInstantsAsStream()
.map(factory::getFileName)
.collect(Collectors.joining(","));
- return new InternalSchemaManager(conf, internalSchema.get(), validCommits,
metaClient.getBasePath().toString(), metaClient.getTimelineLayout());
+ return new InternalSchemaManager(conf, internalSchema.get(), validCommits,
metaClient.getBasePath().toString(), metaClient.getTimelineLayout(),
metaClient.getTableConfig());
}
public InternalSchemaManager(Configuration conf, InternalSchema querySchema,
String validCommits, String tablePath,
- TimelineLayout layout) {
+ TimelineLayout layout, HoodieTableConfig
tableConfig) {
this.conf = conf;
this.querySchema = querySchema;
this.validCommits = validCommits;
this.tablePath = tablePath;
this.layout = layout;
+ this.tableConfig = tableConfig;
}
public InternalSchema getQuerySchema() {
@@ -121,8 +124,7 @@ public class InternalSchemaManager implements Serializable {
InternalSchema fileSchema =
InternalSchemaCache.getInternalSchemaByVersionId(
commitInstantTime, tablePath,
new HoodieHadoopStorage(tablePath, getHadoopConf()),
- validCommits, layout.getInstantFileNameParser(),
- layout.getCommitMetadataSerDe(), layout.getInstantGenerator());
+ validCommits, layout, tableConfig);
if (querySchema.equals(fileSchema)) {
return InternalSchema.getEmptyInternalSchema();
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index eba2e62b521..16f20b74323 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -294,8 +294,8 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
// test change column type float to double
spark.sql(s"alter table $tableName alter column col2 type double")
checkAnswer(s"select id, col1_new, col2 from $tableName where id = 1
or id = 2 order by id")(
- Seq(1, null, getDouble("101.01", isMor)),
- Seq(2, null, getDouble("102.02", isMor)))
+ Seq(1, null, 101.01),
+ Seq(2, null, 102.02))
spark.sql(
s"""
| insert into $tableName values
@@ -308,11 +308,11 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
new java.math.BigDecimal("100001.0001"), "a000001",
java.sql.Date.valueOf("2021-12-25"),
java.sql.Timestamp.valueOf("2021-12-25 12:01:01"), true,
java.sql.Date.valueOf("2021-12-25")),
- Seq(2, null, 2, 12, 100002L, getDouble("102.02", isMor), 1002.0002,
+ Seq(2, null, 2, 12, 100002L, 102.02, 1002.0002,
new java.math.BigDecimal("100002.0002"), "a000002",
java.sql.Date.valueOf("2021-12-25"),
java.sql.Timestamp.valueOf("2021-12-25 12:02:02"), true,
java.sql.Date.valueOf("2021-12-25")),
- Seq(3, null, 3, 13, 100003L, getDouble("103.03", isMor), 1003.0003,
+ Seq(3, null, 3, 13, 100003L, 103.03, 1003.0003,
new java.math.BigDecimal("100003.0003"), "a000003",
java.sql.Date.valueOf("2021-12-25"),
java.sql.Timestamp.valueOf("2021-12-25 12:03:03"), false,
java.sql.Date.valueOf("2021-12-25")),
@@ -366,7 +366,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
spark.sql(s"alter table $tableName alter column col2 type string")
checkAnswer(s"select id, col1_new, col2 from $tableName where id = 1
or id = 2 order by id")(
Seq(1, 3, "101.01"),
- Seq(2, null, getDouble("102.02", isMor && runClustering).toString))
+ Seq(2, null, "102.02"))
spark.sql(
s"""
| insert into $tableName values
@@ -375,7 +375,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
|""".stripMargin)
checkAnswer(s"select id, col1_new, comb, col0, col1, col2, col3, col4,
col5, "
- + s"col6, col7, col8, par from
$tableName")(getExpectedRowsSecondTime(isMor && runClustering): _*)
+ + s"col6, col7, col8, par from
$tableName")(getExpectedRowsSecondTime(): _*)
if (runCompaction) {
// try schedule compact
if (tableType == "mor") spark.sql(s"schedule compaction on
$tableName")
@@ -398,7 +398,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
// Data should not change after scheduling or running table services
checkAnswer(s"select id, col1_new, comb, col0, col1, col2, col3, col4,
col5, "
- + s"col6, col7, col8, par from
$tableName")(getExpectedRowsSecondTime(isMor): _*)
+ + s"col6, col7, col8, par from
$tableName")(getExpectedRowsSecondTime(): _*)
spark.sql(
s"""
| insert into $tableName values
@@ -410,7 +410,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
+ s"where id = 1 or id = 6 or id = 2 or id = 11 order by id")(
Seq(1, 3, "101.01"),
Seq(11, 3, "101.01"),
- Seq(2, null, getDouble("102.02", isMor).toString),
+ Seq(2, null, "102.02"),
Seq(6, 6, "105.05"))
}
spark.sessionState.conf.unsetConf("spark.sql.storeAssignmentPolicy")
@@ -419,18 +419,18 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
}
- private def getExpectedRowsSecondTime(floatToDouble: Boolean): Seq[Seq[Any]]
= {
+ private def getExpectedRowsSecondTime(): Seq[Seq[Any]] = {
Seq(
Seq(1, 3, 1, 11, 100001L, "101.01", 1001.0001, new
java.math.BigDecimal("100001.00010000"),
"a000001", java.sql.Date.valueOf("2021-12-25"),
java.sql.Timestamp.valueOf("2021-12-25 12:01:01"), true,
java.sql.Date.valueOf("2021-12-25")),
- Seq(2, null, 2, 12, 100002L, getDouble("102.02", floatToDouble).toString,
+ Seq(2, null, 2, 12, 100002L, "102.02",
1002.0002, new java.math.BigDecimal("100002.00020000"),
"a000002", java.sql.Date.valueOf("2021-12-25"),
java.sql.Timestamp.valueOf("2021-12-25 12:02:02"), true,
java.sql.Date.valueOf("2021-12-25")),
- Seq(3, null, 3, 13, 100003L, getDouble("103.03", floatToDouble).toString,
+ Seq(3, null, 3, 13, 100003L, "103.03",
1003.0003, new java.math.BigDecimal("100003.00030000"),
"a000003", java.sql.Date.valueOf("2021-12-25"),
java.sql.Timestamp.valueOf("2021-12-25 12:03:03"), false,
@@ -449,16 +449,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
java.sql.Date.valueOf("2021-12-26")))
}
- private def getDouble(value: String, convertFromFloat: Boolean): Double = {
- // TODO(HUDI-8902): Investigate different read behavior on a field after
promotion
- // from float to double
- if (convertFromFloat) {
- value.toFloat.toDouble
- } else {
- value.toDouble
- }
- }
-
test("Test Chinese table ") {
withRecordType()(withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark3ParquetSchemaEvolutionUtils.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark3ParquetSchemaEvolutionUtils.scala
index 47678214607..912879a6028 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark3ParquetSchemaEvolutionUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark3ParquetSchemaEvolutionUtils.scala
@@ -63,7 +63,7 @@ class Spark3ParquetSchemaEvolutionUtils(sharedConf:
Configuration,
val layout =
TimelineLayout.fromVersion(TimelineLayoutVersion.CURR_LAYOUT_VERSION)
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime,
tablePath,
new HoodieHadoopStorage(tablePath, sharedConf), if (validCommits ==
null) "" else validCommits,
- layout.getInstantFileNameParser, layout.getCommitMetadataSerDe,
layout.getInstantGenerator)
+ layout)
} else {
null
}
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
index 02b2338d26d..db493c4629a 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
@@ -168,7 +168,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val storage = new HoodieHadoopStorage(tablePath, sharedConf)
InternalSchemaCache.getInternalSchemaByVersionId(
commitInstantTime, tablePath, storage, if (validCommits == null) ""
else validCommits,
- layout.getInstantFileNameParser, layout.getCommitMetadataSerDe,
layout.getInstantGenerator)
+ layout)
} else {
null
}
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
index 09c9962fe3a..4a9067e7486 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
@@ -179,7 +179,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val storage = new HoodieHadoopStorage(tablePath, sharedConf)
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime,
tablePath, storage,
if (validCommits == null) "" else validCommits,
- layout.getInstantFileNameParser, layout.getCommitMetadataSerDe,
layout.getInstantGenerator)
+ layout)
} else {
null
}
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
index d482d258b61..f72acc1179b 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
@@ -180,7 +180,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
val layout =
TimelineLayout.fromVersion(TimelineLayoutVersion.CURR_LAYOUT_VERSION)
InternalSchemaCache.getInternalSchemaByVersionId(
commitInstantTime, tablePath, storage, if (validCommits == null) ""
else validCommits,
- layout.getInstantFileNameParser, layout.getCommitMetadataSerDe,
layout.getInstantGenerator)
+ layout)
} else {
null
}