This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e44ac12356f9 refactor(spark): Rework tests that disable FileGroup
Reader in Spark (#14061)
e44ac12356f9 is described below
commit e44ac12356f9a855d43b942829de0b2628ac083b
Author: Jon Vexler <[email protected]>
AuthorDate: Tue Dec 2 19:14:56 2025 -0500
refactor(spark): Rework tests that disable FileGroup Reader in Spark
(#14061)
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: Y Ethan Guo <[email protected]>
Co-authored-by: Timothy Brown <[email protected]>
---
.../main/scala/org/apache/hudi/DefaultSource.scala | 54 ++-----
.../functional/TestFiltersInFileGroupReader.java | 14 +-
.../functional/TestNewHoodieParquetFileFormat.java | 158 -------------------
.../hudi/TestAvroSchemaResolutionSupport.scala | 173 ++++++++-------------
.../hudi/functional/TestCOWDataSourceStorage.scala | 25 ++-
.../TestMetadataTableWithSparkDataSource.scala | 2 +-
.../functional/TestPartialUpdateAvroPayload.scala | 18 +--
.../TestSparkSqlWithTimestampKeyGenerator.scala | 24 +--
.../TestVectorizedReadWithSchemaEvolution.scala | 3 +-
.../sql/hudi/dml/insert/TestInsertTable4.scala | 37 -----
10 files changed, 108 insertions(+), 400 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 67f8c321b747..1520cd1bd34d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -294,9 +294,7 @@ object DefaultSource {
Option(schema)
}
- lazy val enableFileGroupReader = SparkConfigUtils
- .getStringWithAltKeys(parameters,
HoodieReaderConfig.FILE_GROUP_READER_ENABLED).toBoolean &&
- !metaClient.isMetadataTable
+ lazy val isNotMetadataTable = !metaClient.isMetadataTable
lazy val tableVersion = if
(SparkConfigUtils.containsConfigProperty(parameters,
INCREMENTAL_READ_TABLE_VERSION)) {
Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
} else {
@@ -306,7 +304,7 @@ object DefaultSource {
if
(metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) {
new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters,
Some(schema)))
} else if (isCdcQuery) {
- if (enableFileGroupReader) {
+ if (isNotMetadataTable) {
if (tableType == COPY_ON_WRITE) {
new HoodieCopyOnWriteCDCHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
@@ -323,24 +321,23 @@ object DefaultSource {
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
(MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
- if (enableFileGroupReader) {
+ if (isNotMetadataTable) {
new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
} else {
resolveBaseFileOnlyRelation(sqlContext, userSchema, metaClient,
parameters)
}
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
- (hoodieTableSupportsCompletionTime, enableFileGroupReader) match {
- case (true, true) => new
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV2(
+ if (hoodieTableSupportsCompletionTime) {
+ new HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV2(
sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable).build()
- case (true, false) => new IncrementalRelationV2(sqlContext,
parameters, userSchema, metaClient, RangeType.OPEN_CLOSED)
- case (false, true) => new
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV1(
+ } else {
+ new HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV1(
sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable).build()
- case (false, false) => new IncrementalRelationV1(sqlContext,
parameters, userSchema, metaClient)
}
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
- if (enableFileGroupReader) {
+ if (isNotMetadataTable) {
new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
} else {
@@ -348,7 +345,7 @@ object DefaultSource {
}
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
- if (enableFileGroupReader) {
+ if (isNotMetadataTable) {
new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap =
true).build()
} else {
@@ -356,22 +353,17 @@ object DefaultSource {
}
case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
- (hoodieTableSupportsCompletionTime, enableFileGroupReader) match {
- case (true, true) => new
HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV2(
+ if (hoodieTableSupportsCompletionTime) {
+ new HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV2(
sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable).build()
- case (true, false) =>
MergeOnReadIncrementalRelationV2(sqlContext, parameters, metaClient, userSchema)
- case (false, true) => new
HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV1(
+ } else {
+ new HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV1(
sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable).build()
- case (false, false) =>
MergeOnReadIncrementalRelationV1(sqlContext, parameters, metaClient, userSchema)
}
case (_, _, true) =>
- if (enableFileGroupReader) {
- new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
- sqlContext, metaClient, parameters, userSchema, isBootstrap =
true).build()
- } else {
- resolveHoodieBootstrapRelation(sqlContext, userSchema,
metaClient, parameters)
- }
+ new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters, userSchema, isBootstrap =
true).build()
case (_, _, _) =>
throw new HoodieException(s"Invalid query type : $queryType for
tableType: $tableType," +
@@ -381,22 +373,6 @@ object DefaultSource {
}
}
- private def resolveHoodieBootstrapRelation(sqlContext: SQLContext,
- userSchema: Option[StructType],
- metaClient: HoodieTableMetaClient,
- parameters: Map[String, String]):
BaseRelation = {
- val enableFileIndex = HoodieSparkConfUtils.getConfigValue(parameters,
sqlContext.sparkSession.sessionState.conf,
- ENABLE_HOODIE_FILE_INDEX.key,
ENABLE_HOODIE_FILE_INDEX.defaultValue.toString).toBoolean
- val isSchemaEvolutionEnabledOnRead =
HoodieSparkConfUtils.getConfigValue(parameters,
- sqlContext.sparkSession.sessionState.conf,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
-
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
- if (!enableFileIndex || isSchemaEvolutionEnabledOnRead ||
!parameters.getOrElse(DATA_QUERIES_ONLY.key,
DATA_QUERIES_ONLY.defaultValue).toBoolean) {
- HoodieBootstrapRelation(sqlContext, userSchema, metaClient, parameters +
(DATA_QUERIES_ONLY.key() -> "false"))
- } else {
- HoodieBootstrapRelation(sqlContext, userSchema, metaClient,
parameters).toHadoopFsRelation
- }
- }
-
private def resolveBaseFileOnlyRelation(sqlContext: SQLContext,
userSchema: Option[StructType],
metaClient: HoodieTableMetaClient,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestFiltersInFileGroupReader.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestFiltersInFileGroupReader.java
index c1221c7b4b32..85bbad2e3233 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestFiltersInFileGroupReader.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestFiltersInFileGroupReader.java
@@ -77,30 +77,28 @@ class TestFiltersInFileGroupReader extends
TestBootstrapReadBase {
}
protected void runComparison(boolean mergeUseRecordPositions) {
- compareDf(createDf(hudiBasePath, true, mergeUseRecordPositions),
createDf(hudiBasePath, false, false));
- compareDf(createDf(bootstrapTargetPath, true, mergeUseRecordPositions),
createDf(bootstrapTargetPath, false, false));
- compareDf(createDf2(hudiBasePath, true, mergeUseRecordPositions),
createDf2(hudiBasePath, false, false));
- compareDf(createDf2(bootstrapTargetPath, true, mergeUseRecordPositions),
createDf2(bootstrapTargetPath, false, false));
+ compareDf(createDf(hudiBasePath, mergeUseRecordPositions),
createDf(hudiBasePath, false));
+ compareDf(createDf(bootstrapTargetPath, mergeUseRecordPositions),
createDf(bootstrapTargetPath, false));
+ compareDf(createDf2(hudiBasePath, mergeUseRecordPositions),
createDf2(hudiBasePath, false));
+ compareDf(createDf2(bootstrapTargetPath, mergeUseRecordPositions),
createDf2(bootstrapTargetPath, false));
}
- protected Dataset<Row> createDf(String tableBasePath, Boolean
fgReaderEnabled, Boolean mergeUseRecordPositions) {
+ protected Dataset<Row> createDf(String tableBasePath, boolean
mergeUseRecordPositions) {
//The chances of a uuid containing 00 with the 8-4-4-4-12 format is around
90%
//for bootstrap, _hoodie_record_key is in the skeleton while begin_lat is
in the data
//We have a record key filter so that tests MORs filter pushdown with
position based merging
return sparkSession.read().format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
fgReaderEnabled)
.option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(),
mergeUseRecordPositions)
.load(tableBasePath)
.drop("city_to_state")
.where("begin_lat > 0.5 and _hoodie_record_key LIKE '%00%'");
}
- protected Dataset<Row> createDf2(String tableBasePath, Boolean
fgReaderEnabled, Boolean mergeUseRecordPositions) {
+ protected Dataset<Row> createDf2(String tableBasePath, boolean
mergeUseRecordPositions) {
//The chances of a uuid containing 00 with the 8-4-4-4-12 format is around
90%
//for bootstrap, _hoodie_record_key is in the skeleton while begin_lat is
in the data
//We have a record key filter so that tests MORs filter pushdown with
position based merging
return sparkSession.read().format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
fgReaderEnabled)
.option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(),
mergeUseRecordPositions)
.load(tableBasePath)
.drop("city_to_state")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
deleted file mode 100644
index f1297643c72f..000000000000
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.functional;
-
-import org.apache.hudi.DataSourceReadOptions;
-import org.apache.hudi.common.config.HoodieReaderConfig;
-import org.apache.hudi.common.model.HoodieTableType;
-
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SaveMode;
-import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Stream;
-
-import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
-import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-@Tag("functional-b")
-public class TestNewHoodieParquetFileFormat extends TestBootstrapReadBase {
-
- private static Stream<Arguments> testArgs() {
- Stream.Builder<Arguments> b = Stream.builder();
- b.add(Arguments.of(MERGE_ON_READ, 0));
- b.add(Arguments.of(COPY_ON_WRITE, 1));
- b.add(Arguments.of(MERGE_ON_READ, 2));
- return b.build();
- }
-
- @ParameterizedTest
- @MethodSource("testArgs")
- void testNewParquetFileFormat(HoodieTableType tableType, Integer
nPartitions) {
- this.bootstrapType = nPartitions == 0 ? "metadata" : "mixed";
- this.dashPartitions = true;
- this.tableType = tableType;
- this.nPartitions = nPartitions;
- setupDirs();
-
- //do bootstrap
- Map<String, String> options = setBootstrapOptions();
- Dataset<Row> bootstrapDf = sparkSession.emptyDataFrame();
- bootstrapDf.write().format("hudi")
- .options(options)
- .mode(SaveMode.Overwrite)
- .save(bootstrapTargetPath);
- runComparisons();
-
- options = basicOptions();
- doUpdate(options, "001");
- runComparisons();
-
- doInsert(options, "002");
- runComparisons();
-
- doDelete(options, "003");
- runComparisons();
- }
-
- protected void runComparisons() {
- if (tableType.equals(MERGE_ON_READ)) {
- runComparison(hudiBasePath);
- }
- runComparison(bootstrapTargetPath);
- }
-
- protected void runComparison(String tableBasePath) {
- testCount(tableBasePath);
- runIndividualComparison(tableBasePath);
- runIndividualComparison(tableBasePath, "partition_path");
- runIndividualComparison(tableBasePath, "_hoodie_record_key",
"_hoodie_commit_time", "_hoodie_partition_path");
- runIndividualComparison(tableBasePath, "_hoodie_commit_time",
"_hoodie_commit_seqno");
- runIndividualComparison(tableBasePath, "_hoodie_commit_time",
"_hoodie_commit_seqno", "partition_path");
- runIndividualComparison(tableBasePath, "_row_key", "_hoodie_commit_seqno",
"_hoodie_record_key", "_hoodie_partition_path");
- runIndividualComparison(tableBasePath, "_row_key", "partition_path",
"_hoodie_is_deleted", "begin_lon");
- }
-
- protected void testCount(String tableBasePath) {
- Dataset<Row> legacyDf = sparkSession.read().format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
- .load(tableBasePath);
- Dataset<Row> fileFormatDf = sparkSession.read().format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true")
- .load(tableBasePath);
- assertEquals(legacyDf.count(), fileFormatDf.count());
- }
-
- protected scala.collection.Seq<String> seq(String... a) {
- return
scala.collection.JavaConverters.asScalaBufferConverter(Arrays.asList(a)).asScala().toSeq();
- }
-
- protected void runIndividualComparison(String tableBasePath) {
- runIndividualComparison(tableBasePath, "");
- }
-
- protected void runIndividualComparison(String tableBasePath, String
firstColumn, String... columns) {
- List<String> queryTypes = new ArrayList<>();
- queryTypes.add(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL());
- if (tableType.equals(MERGE_ON_READ)) {
-
queryTypes.add(DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL());
- }
- for (String queryType : queryTypes) {
- Dataset<Row> legacyDf = sparkSession.read().format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
- .option(DataSourceReadOptions.QUERY_TYPE().key(), queryType)
- .load(tableBasePath);
- Dataset<Row> fileFormatDf = sparkSession.read().format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true")
- .option(DataSourceReadOptions.QUERY_TYPE().key(), queryType)
- .load(tableBasePath);
- if (firstColumn.isEmpty()) {
- //df.except(df) does not work with map type cols
- legacyDf = legacyDf.drop("city_to_state");
- fileFormatDf = fileFormatDf.drop("city_to_state");
-
- //TODO: [HUDI-3204] for toHadoopFs in BaseFileOnlyRelation, the
partition columns will be at the end
- //so just drop column that is out of order here for now
- if
(queryType.equals(DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL())
- && tableType.equals(MERGE_ON_READ) && nPartitions > 0) {
- legacyDf = legacyDf.drop("partition_path");
- fileFormatDf = fileFormatDf.drop("partition_path");
- }
- } else {
- if (columns.length > 0) {
- legacyDf = legacyDf.select(firstColumn, columns);
- fileFormatDf = fileFormatDf.select(firstColumn, columns);
- } else {
- legacyDf = legacyDf.select(firstColumn);
- fileFormatDf = fileFormatDf.select(firstColumn);
- }
- }
- compareDf(legacyDf, fileFormatDf);
- }
- }
-}
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
index 60acf4aaa03c..56eccc0a1376 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
@@ -18,8 +18,7 @@
*/
package org.apache.hudi
-import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig}
-import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.SchemaCompatibilityException
@@ -30,7 +29,7 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types._
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource}
+import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
import scala.language.postfixOps
@@ -74,28 +73,18 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
case 4 => df.withColumn(colToCast, df.col(colToCast).cast("string"))
}
- def initialiseTable(df: DataFrame, saveDir: String, isCow: Boolean = true):
Unit = {
- val opts = if (isCow) {
- commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key ->
"COPY_ON_WRITE")
- } else {
- commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key ->
"MERGE_ON_READ")
- }
-
+ def initialiseTable(df: DataFrame, saveDir: String, tableType: String =
"COPY_ON_WRITE"): Unit = {
+ val opts = commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key ->
tableType)
df.write.format("hudi")
.options(opts)
.mode("overwrite")
.save(saveDir)
}
- def upsertData(df: DataFrame, saveDir: String, isCow: Boolean = true,
shouldAllowDroppedColumns: Boolean = false,
+ def upsertData(df: DataFrame, saveDir: String, tableType: String =
"COPY_ON_WRITE", shouldAllowDroppedColumns: Boolean = false,
enableSchemaValidation: Boolean =
HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue().toBoolean): Unit =
{
- var opts = if (isCow) {
- commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key ->
"COPY_ON_WRITE")
- } else {
- commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key ->
"MERGE_ON_READ")
- }
- opts = opts ++
Map(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key ->
shouldAllowDroppedColumns.toString)
-
+ val opts = commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key ->
tableType,
+ HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key ->
shouldAllowDroppedColumns.toString)
df.write.format("hudi")
.options(opts)
.mode("append")
@@ -103,8 +92,8 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
}
@ParameterizedTest
- @MethodSource(Array("testArgs"))
- def testDataTypePromotions(isCow: Boolean, useFileGroupReader: Boolean):
Unit = {
+ @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+ def testDataTypePromotions(tableType: String): Unit = {
// test to read tables with columns that are promoted via avro schema
resolution
val tempRecordPath = basePath + "/record_tbl/"
val _spark = spark
@@ -134,7 +123,7 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
initDF.show(false)
// recreate table
- initialiseTable(initDF, tempRecordPath, isCow)
+ initialiseTable(initDF, tempRecordPath, tableType)
// perform avro supported casting
var upsertDf = prepDataFrame(df2, colInitType)
@@ -143,11 +132,10 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
upsertDf.show(false)
// upsert
- upsertData(upsertDf, tempRecordPath, isCow)
+ upsertData(upsertDf, tempRecordPath, tableType)
// read out the table
val readDf = spark.read.format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
useFileGroupReader)
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
@@ -177,8 +165,8 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
}
@ParameterizedTest
- @MethodSource(Array("testArgs"))
- def testAddNewColumn(isCow: Boolean, useFileGroupReader: Boolean): Unit = {
+ @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+ def testAddNewColumn(tableType: String): Unit = {
// test to add a column
val tempRecordPath = basePath + "/record_tbl/"
val _spark = spark
@@ -193,7 +181,7 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
initDF.show(false)
// recreate table
- initialiseTable(initDF, tempRecordPath, isCow)
+ initialiseTable(initDF, tempRecordPath, tableType)
// perform avro supported operation of adding a new column at the end of
the table
val upsertDf = df2
@@ -201,11 +189,10 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
upsertDf.show(false)
// upsert
- upsertData(upsertDf, tempRecordPath, isCow)
+ upsertData(upsertDf, tempRecordPath, tableType)
// read out the table
val readDf = spark.read.format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
useFileGroupReader)
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
@@ -224,7 +211,6 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
val tempRecordPath = basePath + "/record_tbl/"
val _spark = spark
import _spark.implicits._
- val isCow = tableType.equals(HoodieTableType.COPY_ON_WRITE.name())
val df1 = Seq((1, 100, "aaa")).toDF("id", "userid", "name")
val df2 = Seq((2, "bbb")).toDF("id", "name")
@@ -235,7 +221,7 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
initDF.show(false)
// recreate table
- initialiseTable(initDF, tempRecordPath, isCow)
+ initialiseTable(initDF, tempRecordPath, tableType)
// perform avro supported operation of deleting a column
val upsertDf = df2
@@ -244,16 +230,15 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
// upsert
assertThrows(classOf[SchemaCompatibilityException]) {
- upsertData(upsertDf, tempRecordPath, isCow, enableSchemaValidation =
schemaValidationEnabled)
+ upsertData(upsertDf, tempRecordPath, tableType, enableSchemaValidation =
schemaValidationEnabled)
}
- upsertData(upsertDf, tempRecordPath, isCow, shouldAllowDroppedColumns =
true, enableSchemaValidation = schemaValidationEnabled)
+ upsertData(upsertDf, tempRecordPath, tableType, shouldAllowDroppedColumns
= true, enableSchemaValidation = schemaValidationEnabled)
// read out the table
//schemaValidationEnabled is a writer config, so we will also test the fg
reader with the
//same param since that will only affect the reader
val readDf = spark.read.format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
schemaValidationEnabled)
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
@@ -261,8 +246,8 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
}
@ParameterizedTest
- @MethodSource(Array("testArgs"))
- def testColumnPositionChange(isCow: Boolean, useFileGroupReader: Boolean):
Unit = {
+ @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+ def testColumnPositionChange(tableType: String): Unit = {
// test to change column positions
val tempRecordPath = basePath + "/record_tbl/"
val _spark = spark
@@ -277,7 +262,7 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
initDF.show(false)
// recreate table
- initialiseTable(initDF, tempRecordPath, isCow)
+ initialiseTable(initDF, tempRecordPath, tableType)
// perform avro supported operation of deleting a column
val upsertDf = df2
@@ -285,11 +270,10 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
upsertDf.show(false)
// upsert
- upsertData(upsertDf, tempRecordPath, isCow)
+ upsertData(upsertDf, tempRecordPath, tableType)
// read out the table
val readDf = spark.read.format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
useFileGroupReader)
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
@@ -297,8 +281,8 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
}
@ParameterizedTest
- @MethodSource(Array("testArgs"))
- def testArrayOfStructsAddNewColumn(isCow: Boolean, useFileGroupReader:
Boolean): Unit = {
+ @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+ def testArrayOfStructsAddNewColumn(tableType: String): Unit = {
// test to add a field to a STRUCT in a column of ARRAY< STRUCT<..> > type
val tempRecordPath = basePath + "/record_tbl/"
val arrayStructData = Seq(
@@ -317,7 +301,7 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df1.show(false)
// recreate table
- initialiseTable(df1, tempRecordPath, isCow)
+ initialiseTable(df1, tempRecordPath, tableType)
// add a column to array of struct
val newArrayStructData = Seq(
@@ -337,11 +321,10 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df2.printSchema()
df2.show(false)
// upsert
- upsertData(df2, tempRecordPath, isCow)
+ upsertData(df2, tempRecordPath, tableType)
// read out the table
val readDf = spark.read.format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
useFileGroupReader)
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
@@ -349,8 +332,8 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
}
@ParameterizedTest
- @MethodSource(Array("testArgs"))
- def testArrayOfStructsChangeColumnType(isCow: Boolean, useFileGroupReader:
Boolean): Unit = {
+ @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+ def testArrayOfStructsChangeColumnType(tableType: String): Unit = {
// test to change the type of a field from a STRUCT in a column of ARRAY<
STRUCT<..> > type
val tempRecordPath = basePath + "/record_tbl/"
val arrayStructData = Seq(
@@ -369,7 +352,7 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df1.show(false)
// recreate table
- initialiseTable(df1, tempRecordPath, isCow)
+ initialiseTable(df1, tempRecordPath, tableType)
// add a column to array of struct
val newArrayStructData = Seq(
@@ -387,12 +370,11 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df2.printSchema()
df2.show(false)
// upsert
- upsertData(df2, tempRecordPath, isCow)
+ upsertData(df2, tempRecordPath, tableType)
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" ->
"false") {
// read out the table
val readDf = spark.read.format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
useFileGroupReader)
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
@@ -401,8 +383,8 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
}
@ParameterizedTest
- @MethodSource(Array("testArgs"))
- def testArrayOfStructsChangeColumnPosition(isCow: Boolean,
useFileGroupReader: Boolean): Unit = {
+ @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+ def testArrayOfStructsChangeColumnPosition(tableType: String): Unit = {
// test to change the position of a field from a STRUCT in a column of
ARRAY< STRUCT<..> > type
val tempRecordPath = basePath + "/record_tbl/"
val arrayStructData = Seq(
@@ -421,7 +403,7 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df1.show(false)
// recreate table
- initialiseTable(df1, tempRecordPath, isCow)
+ initialiseTable(df1, tempRecordPath, tableType)
// add a column to array of struct
val newArrayStructData = Seq(
@@ -439,11 +421,10 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df2.printSchema()
df2.show(false)
// upsert
- upsertData(df2, tempRecordPath, isCow)
+ upsertData(df2, tempRecordPath, tableType)
// read out the table
val readDf = spark.read.format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
useFileGroupReader)
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
@@ -451,8 +432,8 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
}
@ParameterizedTest
- @MethodSource(Array("testArgs"))
- def testArrayOfMapsChangeValueType(isCow: Boolean, useFileGroupReader:
Boolean): Unit = {
+ @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+ def testArrayOfMapsChangeValueType(tableType: String): Unit = {
// test to change the value type of a MAP in a column of ARRAY< MAP<k,v> >
type
val tempRecordPath = basePath + "/record_tbl/"
val arrayMapData = Seq(
@@ -469,7 +450,7 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df1.show(false)
// recreate table
- initialiseTable(df1, tempRecordPath, isCow)
+ initialiseTable(df1, tempRecordPath, tableType)
// change value type from integer to long
val newArrayMapData = Seq(
@@ -485,12 +466,11 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df2.printSchema()
df2.show(false)
// upsert
- upsertData(df2, tempRecordPath, isCow)
+ upsertData(df2, tempRecordPath, tableType)
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" ->
"false") {
// read out the table
val readDf = spark.read.format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
useFileGroupReader)
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
@@ -499,8 +479,8 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
}
@ParameterizedTest
- @MethodSource(Array("testArgs"))
- def testArrayOfMapsStructChangeFieldType(isCow: Boolean, useFileGroupReader:
Boolean): Unit = {
+ @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+ def testArrayOfMapsStructChangeFieldType(tableType: String): Unit = {
// test to change a field type of a STRUCT in a column of ARRAY< MAP<
k,STRUCT<..> > > type
val tempRecordPath = basePath + "/record_tbl/"
val arrayMapData = Seq(
@@ -526,7 +506,7 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df1.show(false)
// recreate table
- initialiseTable(df1, tempRecordPath, isCow)
+ initialiseTable(df1, tempRecordPath, tableType)
// change inner struct's type from integer to long
val newArrayMapData = Seq(
@@ -551,12 +531,11 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df2.printSchema()
df2.show(false)
// upsert
- upsertData(df2, tempRecordPath, isCow)
+ upsertData(df2, tempRecordPath, tableType)
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" ->
"false") {
// read out the table
val readDf = spark.read.format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
useFileGroupReader)
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
@@ -565,8 +544,8 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
}
@ParameterizedTest
- @MethodSource(Array("testArgs"))
- def testArrayOfMapsStructAddField(isCow: Boolean, useFileGroupReader:
Boolean): Unit = {
+ @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+ def testArrayOfMapsStructAddField(tableType: String): Unit = {
// test to add a field to a STRUCT in a column of ARRAY< MAP< k,STRUCT<..>
> > type
val tempRecordPath = basePath + "/record_tbl/"
val arrayMapData = Seq(
@@ -592,7 +571,7 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df1.show(false)
// recreate table
- initialiseTable(df1, tempRecordPath, isCow)
+ initialiseTable(df1, tempRecordPath, tableType)
// add a new column
val newArrayMapData = Seq(
@@ -618,11 +597,10 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df2.printSchema()
df2.show(false)
// upsert
- upsertData(df2, tempRecordPath, isCow)
+ upsertData(df2, tempRecordPath, tableType)
// read out the table
val readDf = spark.read.format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
useFileGroupReader)
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
@@ -630,8 +608,8 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
}
@ParameterizedTest
- @MethodSource(Array("testArgs"))
- def testArrayOfMapsStructChangeFieldPosition(isCow: Boolean,
useFileGroupReader: Boolean): Unit = {
+ @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+ def testArrayOfMapsStructChangeFieldPosition(tableType: String): Unit = {
// test to change the position of fields of a STRUCT in a column of ARRAY<
MAP< k,STRUCT<..> > > type
val tempRecordPath = basePath + "/record_tbl/"
val arrayMapData = Seq(
@@ -657,7 +635,7 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df1.show(false)
// recreate table
- initialiseTable(df1, tempRecordPath, isCow)
+ initialiseTable(df1, tempRecordPath, tableType)
// change column position
val newArrayMapData = Seq(
@@ -682,11 +660,10 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df2.printSchema()
df2.show(false)
// upsert
- upsertData(df2, tempRecordPath, isCow)
+ upsertData(df2, tempRecordPath, tableType)
// read out the table
val readDf = spark.read.format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
useFileGroupReader)
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
@@ -694,8 +671,8 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
}
@ParameterizedTest
- @MethodSource(Array("testArgs"))
- def testArrayOfMapsStructDeleteField(isCow: Boolean, useFileGroupReader:
Boolean): Unit = {
+ @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+ def testArrayOfMapsStructDeleteField(tableType: String): Unit = {
// test to delete a field of a STRUCT in a column of ARRAY< MAP<
k,STRUCT<..> > > type
val tempRecordPath = basePath + "/record_tbl/"
@@ -722,7 +699,7 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df1.show(false)
// recreate table
- initialiseTable(df1, tempRecordPath, isCow)
+ initialiseTable(df1, tempRecordPath, tableType)
// change column position
val newArrayMapData = Seq(
@@ -746,11 +723,10 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df2.printSchema()
df2.show(false)
// upsert
- upsertData(df2, tempRecordPath, isCow, true)
+ upsertData(df2, tempRecordPath, tableType, true)
// read out the table
val readDf = spark.read.format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
useFileGroupReader)
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
@@ -758,8 +734,8 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
}
@ParameterizedTest
- @MethodSource(Array("testArgs"))
- def testComplexOperationsOnTable(isCow: Boolean, useFileGroupReader:
Boolean): Unit = {
+ @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+ def testComplexOperationsOnTable(tableType: String): Unit = {
// test a series of changes on a Hudi table
var defaultPartitionIdx = 0
@@ -777,13 +753,13 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
val df1 = Seq((1, 100, newPartition)).toDF("id", "userid", "name")
df1.printSchema()
df1.show(false)
- initialiseTable(df1, tempRecordPath, isCow)
+ initialiseTable(df1, tempRecordPath, tableType)
// 2. Promote INT type to LONG into a different partition
val df2 = Seq((2, 200L, newPartition)).toDF("id", "userid", "name")
df2.printSchema()
df2.show(false)
- upsertData(df2, tempRecordPath, isCow)
+ upsertData(df2, tempRecordPath, tableType)
// 3. Promote LONG to FLOAT
var df3 = Seq((3, 300, newPartition)).toDF("id", "userid", "name")
@@ -825,7 +801,6 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
// read out the table
val readDf = spark.read.format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
useFileGroupReader)
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
@@ -833,8 +808,8 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
}
@ParameterizedTest
- @MethodSource(Array("testArgs"))
- def testNestedTypeVectorizedReadWithTypeChange(isCow: Boolean,
useFileGroupReader: Boolean): Unit = {
+ @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+ def testNestedTypeVectorizedReadWithTypeChange(tableType: String): Unit = {
// test to change the value type of a MAP in a column of ARRAY< MAP<k,v> >
type
val tempRecordPath = basePath + "/record_tbl/"
val arrayMapData = Seq(
@@ -851,10 +826,10 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df1.show(false)
// recreate table
- initialiseTable(df1, tempRecordPath, isCow)
+ initialiseTable(df1, tempRecordPath, tableType)
// read out the table, will not throw any exception
- readTable(tempRecordPath, useFileGroupReader)
+ readTable(tempRecordPath)
// change value type from integer to long
val newArrayMapData = Seq(
@@ -870,31 +845,30 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
df2.printSchema()
df2.show(false)
// upsert
- upsertData(df2, tempRecordPath, isCow)
+ upsertData(df2, tempRecordPath, tableType)
// after implicit type change, read the table with vectorized read enabled
- if (HoodieSparkUtils.gteqSpark3_4 || !useFileGroupReader) {
+ if (HoodieSparkUtils.gteqSpark3_4) {
assertThrows(classOf[SparkException]) {
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" ->
"true") {
- readTable(tempRecordPath, useFileGroupReader)
+ readTable(tempRecordPath)
}
}
} else {
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" ->
"true") {
- readTable(tempRecordPath, useFileGroupReader)
+ readTable(tempRecordPath)
}
}
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" ->
"false") {
- readTable(tempRecordPath, useFileGroupReader)
+ readTable(tempRecordPath)
}
}
- private def readTable(path: String, useFileGroupReader: Boolean): Unit = {
+ private def readTable(path: String): Unit = {
// read out the table
val readDf = spark.read.format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
useFileGroupReader)
.load(path)
readDf.printSchema()
readDf.show(false)
@@ -918,14 +892,3 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
}
}
-
-object TestAvroSchemaResolutionSupport {
- def testArgs: java.util.stream.Stream[Arguments] = {
- val scenarios = Array(
- Seq(true, true),
- Seq(false, true),
- Seq(false, false)
- )
- java.util.Arrays.stream(scenarios.map(as =>
Arguments.arguments(as.map(_.asInstanceOf[AnyRef]):_*)))
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
index 8986a8ff55f6..59edb399259a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
@@ -21,7 +21,7 @@ package org.apache.hudi.functional
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieDataSourceHelpers}
import org.apache.hudi.client.validator.{SqlQueryEqualityPreCommitValidator,
SqlQueryInequalityPreCommitValidator}
-import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig}
+import org.apache.hudi.common.config.HoodieMetadataConfig
import
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TYPE_FIELD}
import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.common.table.HoodieTableConfig
@@ -72,28 +72,21 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
@ParameterizedTest
@CsvSource(value = Array(
- "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|true",
-
"true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|true",
- "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|true",
- "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|true",
-
"false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|true",
- "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|true",
- "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|false",
-
"true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|false",
- "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|false",
- "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|false",
-
"false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|false",
- "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|false"
+ "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
+ "true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency",
+ "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key",
+ "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
+ "false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency",
+ "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key"
), delimiter = '|')
- def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String,
recordKeys: String, fileGroupReaderEnabled: Boolean): Unit = {
+ def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String,
recordKeys: String): Unit = {
var options: Map[String, String] = commonOpts ++ Map(
HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled),
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> keyGenClass,
DataSourceWriteOptions.RECORDKEY_FIELD.key -> recordKeys,
HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true")
- val readOptions = Map(HoodieMetadataConfig.ENABLE.key() ->
String.valueOf(isMetadataEnabled),
- HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() ->
String.valueOf(fileGroupReaderEnabled))
+ val readOptions = Map(HoodieMetadataConfig.ENABLE.key() ->
String.valueOf(isMetadataEnabled))
val isTimestampBasedKeyGen: Boolean =
classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)
if (isTimestampBasedKeyGen) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
index 207cb9018eb2..feccc6b4377b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
@@ -41,7 +41,7 @@ import
org.apache.hudi.util.JavaScalaConverters.convertJavaListToScalaSeq
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{col, explode}
-import org.junit.jupiter.api.{Tag, Test}
+import org.junit.jupiter.api.{Disabled, Tag, Test}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
index 2f59db9a5107..9ed6ed282ef7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
@@ -21,7 +21,7 @@ package org.apache.hudi.functional
import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils}
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.QuickstartUtils.{convertToStringList,
getQuickstartWriteConfigs}
-import org.apache.hudi.common.config.{HoodieReaderConfig, RecordMergeMode}
+import org.apache.hudi.common.config.RecordMergeMode
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.Option
@@ -37,7 +37,7 @@ import org.apache.spark.sql.types.{DoubleType, StringType}
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.CsvSource
+import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
import java.util.function.Consumer
@@ -69,14 +69,8 @@ class TestPartialUpdateAvroPayload extends
HoodieClientTestBase {
}
@ParameterizedTest
- @CsvSource(Array(
- "COPY_ON_WRITE,false",
- "MERGE_ON_READ,false",
- "COPY_ON_WRITE,true",
- "MERGE_ON_READ,true"
- ))
- def testPartialUpdatesAvroPayloadPrecombine(tableType: String,
useFileGroupReader: Boolean): Unit = {
- val hoodieTableType = HoodieTableType.valueOf(tableType)
+ @EnumSource(value = classOf[HoodieTableType])
+ def testPartialUpdatesAvroPayloadPrecombine(hoodieTableType:
HoodieTableType): Unit = {
val dataGenerator = new QuickstartUtils.DataGenerator()
val records = convertToStringList(dataGenerator.generateInserts(1))
val recordsRDD = spark.sparkContext.parallelize(records.asScala.toSeq, 2)
@@ -125,9 +119,7 @@ class TestPartialUpdateAvroPayload extends
HoodieClientTestBase {
.mode(SaveMode.Append)
.save(basePath)
- val finalDF = spark.read.format("hudi")
- .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
String.valueOf(useFileGroupReader))
- .load(basePath)
+ val finalDF = spark.read.format("hudi").load(basePath)
assertEquals(finalDF.select("rider").collectAsList().get(0).getString(0),
upsert1DF.select("rider").collectAsList().get(0).getString(0))
assertEquals(finalDF.select("driver").collectAsList().get(0).getString(0),
upsert2DF.select("driver").collectAsList().get(0).getString(0))
assertEquals(finalDF.select("fare").collectAsList().get(0).getDouble(0),
upsert3DF.select("fare").collectAsList().get(0).getDouble(0))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
index e54df91b0712..2ba74abbf8b4 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
@@ -33,20 +33,13 @@ class TestSparkSqlWithTimestampKeyGenerator extends
HoodieSparkSqlTestBase {
test("Test Spark SQL with timestamp key generator") {
withTempDir { tmp =>
- Seq(
- Seq("COPY_ON_WRITE", "true"),
- Seq("COPY_ON_WRITE", "false"),
- Seq("MERGE_ON_READ", "true"),
- Seq("MERGE_ON_READ", "false")
- ).foreach { testParams =>
- val tableType = testParams(0)
+ Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType =>
// enables use of engine agnostic file group reader
- val shouldUseFileGroupReader = testParams(1)
timestampKeyGeneratorSettings.foreach { keyGeneratorSettings =>
withTable(generateTableName) { tableName =>
// Warning level is used due to CI run with warn-log profile for
quick failed cases identification
- LOG.warn(s"Table '$tableName' with parameters: $testParams.
Timestamp key generator settings: $keyGeneratorSettings")
+ LOG.warn(s"Table '$tableName'. Timestamp key generator settings:
$keyGeneratorSettings")
val tablePath = tmp.getCanonicalPath + "/" + tableName
val tsType = if (keyGeneratorSettings.contains("DATE_STRING"))
"string" else "long"
spark.sql(
@@ -65,7 +58,6 @@ class TestSparkSqlWithTimestampKeyGenerator extends
HoodieSparkSqlTestBase {
| orderingFields = 'precomb',
| hoodie.datasource.write.partitionpath.field = 'ts',
| hoodie.datasource.write.hive_style_partitioning = 'false',
- | hoodie.file.group.reader.enabled =
'$shouldUseFileGroupReader',
| hoodie.table.keygenerator.class =
'org.apache.hudi.keygen.TimestampBasedKeyGenerator',
| $keyGeneratorSettings
| )
@@ -80,7 +72,6 @@ class TestSparkSqlWithTimestampKeyGenerator extends
HoodieSparkSqlTestBase {
(dataBatchesWithLongOfSeconds, queryResultWithLongOfSeconds)
withSQLConf(SPARK_SQL_INSERT_INTO_OPERATION.key -> "upsert",
- "hoodie.file.group.reader.enabled" ->
s"$shouldUseFileGroupReader",
"hoodie.datasource.query.type" -> "snapshot") {
// two partitions, one contains parquet file only, the second
one contains parquet and log files for MOR, and two parquets for COW
spark.sql(s"INSERT INTO $tableName VALUES ${dataBatches(0)}")
@@ -88,15 +79,7 @@ class TestSparkSqlWithTimestampKeyGenerator extends
HoodieSparkSqlTestBase {
val queryResult = spark.sql(s"SELECT id, name, precomb, ts FROM
$tableName ORDER BY id").collect().mkString("; ")
LOG.warn(s"Query result: $queryResult")
- // TODO: use `shouldExtractPartitionValuesFromPartitionPath`
uniformly, and get `expectedQueryResult` for all cases instead of
`expectedQueryResultWithLossyString` for some cases
- // After it we could properly process filters like "WHERE ts
BETWEEN 1078016000 and 1718953003" and add tests with partition pruning.
- // COW: Fix for [HUDI-3896] overwrites
`shouldExtractPartitionValuesFromPartitionPath` in `BaseFileOnlyRelation`,
therefore for COW we extracting from partition paths and get nulls
- // shouldUseFileGroupReader: [HUDI-7925] Currently there is no
logic for `shouldExtractPartitionValuesFromPartitionPath` in
`HoodieBaseHadoopFsRelationFactory`
- // UPDATE: with [HUDI-5807] we now have fg reader support.
However partition pruning is still not fixed.
- if (tableType == "COPY_ON_WRITE" &&
!shouldUseFileGroupReader.toBoolean)
- assertResult(expectedQueryResultWithLossyString)(queryResult)
- else
- assertResult(expectedQueryResult)(queryResult)
+ assertResult(expectedQueryResult)(queryResult)
}
}
}
@@ -202,5 +185,4 @@ object TestSparkSqlWithTimestampKeyGenerator {
val queryResultWithLongOfSeconds: String = "[1,a1,1,1078016523];
[2,a3,1,1718952603]"
val queryResultWithLongOfMilliseconds: String = "[1,a1,1,1078016523000];
[2,a3,1,1718952603000]"
val queryResultWithString: String = "[1,a1,1,2004-02-29 01:02:03];
[2,a3,1,2024-06-21 06:50:03]"
- val expectedQueryResultWithLossyString: String = "[1,a1,1,2004-02-29 01];
[2,a3,1,2024-06-21 06]"
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestVectorizedReadWithSchemaEvolution.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestVectorizedReadWithSchemaEvolution.scala
index 244b579fc587..5738621bea18 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestVectorizedReadWithSchemaEvolution.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestVectorizedReadWithSchemaEvolution.scala
@@ -27,8 +27,7 @@ class TestVectorizedReadWithSchemaEvolution extends
HoodieSparkSqlTestBase {
"hoodie.schema.on.read.enable" -> "true",
"spark.sql.parquet.enableVectorizedReader" -> "true",
"spark.sql.codegen.maxFields" -> "1",
- "hoodie.parquet.small.file.limit" -> "0",
- "hoodie.file.group.reader.enabled" -> "false"
+ "hoodie.parquet.small.file.limit" -> "0"
) {
withTempDir { tmp =>
val tableName = generateTableName
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable4.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable4.scala
index 1e8e5151a7f1..8d786fde8c8c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable4.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable4.scala
@@ -439,43 +439,6 @@ class TestInsertTable4 extends HoodieSparkSqlTestBase {
}
}
- test("Test vectorized read nested columns for
LegacyHoodieParquetFileFormat") {
- withSQLConf(
- "hoodie.datasource.read.use.new.parquet.file.format" -> "false",
- "hoodie.file.group.reader.enabled" -> "false",
- "spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true",
- "spark.sql.parquet.enableVectorizedReader" -> "true") {
- withTempDir { tmp =>
- val tableName = generateTableName
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | attributes map<string, string>,
- | price double,
- | ts long,
- | dt string
- |) using hudi
- | tblproperties (primaryKey = 'id')
- | partitioned by (dt)
- | location '${tmp.getCanonicalPath}'
- """.stripMargin)
- spark.sql(
- s"""
- | insert into $tableName values
- | (1, 'a1', map('color', 'red', 'size', 'M'), 10, 1000,
'2021-01-05'),
- | (2, 'a2', map('color', 'blue', 'size', 'L'), 20, 2000,
'2021-01-06'),
- | (3, 'a3', map('color', 'green', 'size', 'S'), 30, 3000,
'2021-01-07')
- """.stripMargin)
- // Check the inserted records with map type attributes
- checkAnswer(s"select id, name, price, ts, dt from $tableName where
attributes.color = 'red'")(
- Seq(1, "a1", 10.0, 1000, "2021-01-05")
- )
- }
- }
- }
-
def ingestAndValidateDataNoPrecombine(tableType: String, tableName: String,
tmp: File,
expectedOperationtype:
WriteOperationType,
setOptions: (String, String)*): Unit =
{