This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch release-0.11.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 40c80f6627956e4cdb7a931c40b7363e81272c58 Author: Alexey Kudinkin <[email protected]> AuthorDate: Tue Apr 19 10:40:20 2022 -0700 [HUDI-3902] Fallback to `HadoopFsRelation` in cases non-involving Schema Evolution (#5352) Co-authored-by: Raymond Xu <[email protected]> --- .../org/apache/hudi/BaseFileOnlyRelation.scala | 50 +++++++++++++++++++++- .../main/scala/org/apache/hudi/DefaultSource.scala | 36 ++++++++++++++-- .../scala/org/apache/hudi/HoodieBaseRelation.scala | 15 ++++--- .../scala/org/apache/hudi/SparkDatasetMixin.scala | 43 +++++++++++++++++++ .../apache/hudi/functional/TestCOWDataSource.scala | 19 ++++---- .../hudi/functional/TestCOWDataSourceStorage.scala | 3 +- .../apache/hudi/functional/TestMORDataSource.scala | 20 +++------ .../functional/TestParquetColumnProjection.scala | 9 ++-- 8 files changed, 156 insertions(+), 39 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index f46b31b036..adf94fffde 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -20,13 +20,15 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path - import org.apache.hudi.HoodieBaseRelation.createBaseFileReader +import org.apache.hudi.common.model.HoodieFileFormat import org.apache.hudi.common.table.HoodieTableMetaClient - +import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.hive.orc.OrcFileFormat import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType @@ -104,4 +106,48 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, sparkAdapter.getFilePartitions(sparkSession, fileSplits, maxSplitBytes) .map(HoodieBaseFileSplit.apply) } + + /** + * NOTE: We have to fallback to [[HadoopFsRelation]] to make sure that all of the Spark optimizations could be + * equally applied to Hudi tables, since some of those are predicated on the usage of [[HadoopFsRelation]], + * and won't be applicable in case of us using our own custom relations (one of such optimizations is [[SchemaPruning]] + * rule; you can find more details in HUDI-3896) + */ + def toHadoopFsRelation: HadoopFsRelation = { + val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match { + case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet") + case HoodieFileFormat.ORC => (new OrcFileFormat, "orc") + } + + if (globPaths.isEmpty) { + HadoopFsRelation( + location = fileIndex, + partitionSchema = fileIndex.partitionSchema, + dataSchema = fileIndex.dataSchema, + bucketSpec = None, + fileFormat = tableFileFormat, + optParams)(sparkSession) + } else { + val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key) + val extraReadPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq()) + + DataSource.apply( + sparkSession = sparkSession, + paths = extraReadPaths, + userSpecifiedSchema = userSchema, + className = formatClassName, + // Since we're reading the table as just collection of files we have to make sure + // we only read the latest version of every Hudi's file-group, which might be compacted, clustered, etc. + // while keeping previous versions of the files around as well. + // + // We rely on [[HoodieROTablePathFilter]], to do proper filtering to assure that + options = optParams ++ Map( + "mapreduce.input.pathFilter.class" -> classOf[HoodieROTablePathFilter].getName + ), + partitionColumns = partitionColumns + ) + .resolveRelation() + .asInstanceOf[HadoopFsRelation] + } + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 7550ff13fd..c1229d5500 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -21,11 +21,13 @@ import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION} import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} +import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ} import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.config.HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE import org.apache.hudi.exception.HoodieException +import org.apache.hudi.internal.schema.InternalSchema import org.apache.log4j.LogManager import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.hudi.streaming.HoodieStreamSource @@ -108,7 +110,7 @@ class DefaultSource extends RelationProvider case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) | (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) | (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) => - new BaseFileOnlyRelation(sqlContext, metaClient, parameters, userSchema, globPaths) + resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters) case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => new IncrementalRelation(sqlContext, parameters, userSchema, metaClient) @@ -141,7 +143,7 @@ class DefaultSource extends RelationProvider * * TODO: Revisit to return a concrete relation here when we support CREATE TABLE AS for Hudi with DataSource API. * That is the only case where Spark seems to actually need a relation to be returned here - * [[DataSource.writeAndRead()]] + * [[org.apache.spark.sql.execution.datasources.DataSource.writeAndRead()]] * * @param sqlContext Spark SQL Context * @param mode Mode for saving the DataFrame at the destination @@ -206,4 +208,32 @@ class DefaultSource extends RelationProvider parameters: Map[String, String]): Source = { new HoodieStreamSource(sqlContext, metadataPath, schema, parameters) } + + private def resolveBaseFileOnlyRelation(sqlContext: SQLContext, + globPaths: Seq[Path], + userSchema: Option[StructType], + metaClient: HoodieTableMetaClient, + optParams: Map[String, String]) = { + val baseRelation = new BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema, globPaths) + val enableSchemaOnRead: Boolean = !tryFetchInternalSchema(metaClient).isEmptySchema + + // NOTE: We fallback to [[HadoopFsRelation]] in all of the cases except ones requiring usage of + // [[BaseFileOnlyRelation]] to function correctly. This is necessary to maintain performance parity w/ + // vanilla Spark, since some of the Spark optimizations are predicated on the using of [[HadoopFsRelation]]. + // + // You can check out HUDI-3896 for more details + if (enableSchemaOnRead) { + baseRelation + } else { + baseRelation.toHadoopFsRelation + } + } + + private def tryFetchInternalSchema(metaClient: HoodieTableMetaClient) = + try { + new TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata + .orElse(InternalSchema.getEmptyInternalSchema) + } catch { + case _: Exception => InternalSchema.getEmptyInternalSchema + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 769016ca8a..53667f3b88 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -19,12 +19,10 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hbase.io.hfile.CacheConfig import org.apache.hadoop.mapred.JobConf - import org.apache.hudi.HoodieBaseRelation.getPartitionPath import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration} @@ -38,13 +36,13 @@ import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.io.storage.HoodieHFileReader - import org.apache.spark.TaskContext import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} +import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} @@ -54,7 +52,6 @@ import org.apache.spark.unsafe.types.UTF8String import java.io.Closeable import java.net.URI - import scala.collection.JavaConverters._ import scala.util.Try import scala.util.control.NonFatal @@ -78,7 +75,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, val metaClient: HoodieTableMetaClient, val optParams: Map[String, String], userSchema: Option[StructType]) - extends BaseRelation with PrunedFilteredScan with Logging with SparkAdapterSupport { + extends BaseRelation + with FileRelation + with PrunedFilteredScan + with SparkAdapterSupport + with Logging { type FileSplit <: HoodieFileSplit @@ -198,6 +199,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, */ override final def needConversion: Boolean = false + override def inputFiles: Array[String] = fileIndex.allFiles.map(_.getPath.toUri.toString).toArray + /** * NOTE: DO NOT OVERRIDE THIS METHOD */ @@ -255,6 +258,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, sparkSession.sparkContext.emptyRDD } + + /** * Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied * diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/SparkDatasetMixin.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/SparkDatasetMixin.scala new file mode 100644 index 0000000000..ee733a86a6 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/SparkDatasetMixin.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.avro.generic.GenericRecord +import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload} +import org.apache.hudi.common.testutils.HoodieTestDataGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession + +import scala.collection.JavaConversions.collectionAsScalaIterable + +trait SparkDatasetMixin { + + def toDataset(spark: SparkSession, records: java.util.List[HoodieRecord[_]]) = { + val avroRecords = records.map( + _.getData + .asInstanceOf[HoodieRecordPayload[_]] + .getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA) + .get + .asInstanceOf[GenericRecord] + ) + .toSeq + val rdd: RDD[GenericRecord] = spark.sparkContext.parallelize(avroRecords) + AvroConversionUtils.createDataFrame(rdd, HoodieTestDataGenerator.AVRO_SCHEMA.toString, spark) + } + +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 000004ace9..b232ef010f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -37,7 +37,7 @@ import org.joda.time.DateTime import org.joda.time.format.DateTimeFormat import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail} import org.junit.jupiter.api.function.Executable -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, ValueSource} @@ -897,6 +897,7 @@ class TestCOWDataSource extends HoodieClientTestBase { readResult.sort("_row_key").select("shortDecimal").collect().map(_.getDecimal(0).toPlainString).mkString(",")) } + @Disabled("HUDI-3204") @Test def testHoodieBaseFileOnlyViewRelation(): Unit = { val _spark = spark @@ -918,9 +919,9 @@ class TestCOWDataSource extends HoodieClientTestBase { .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts") .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.TimestampBasedKeyGenerator") .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "DATE_STRING") + .option(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "yyyy-MM-dd") .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyy/MM/dd") .option(Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, "GMT+8:00") - .option(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "yyyy-MM-dd") .mode(org.apache.spark.sql.SaveMode.Append) .save(basePath) @@ -929,15 +930,13 @@ class TestCOWDataSource extends HoodieClientTestBase { assert(res.count() == 2) // data_date is the partition field. Persist to the parquet file using the origin values, and read it. - assertTrue( - res.select("data_date").map(_.get(0).toString).collect().sorted.sameElements( - Array("2018-09-23", "2018-09-24") - ) + assertEquals( + res.select("data_date").map(_.get(0).toString).collect().sorted, + Array("2018-09-23", "2018-09-24") ) - assertTrue( - res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.sameElements( - Array("2018/09/23", "2018/09/24") - ) + assertEquals( + res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted, + Array("2018/09/23", "2018/09/24") ) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala index e7daf08d11..bd7edd4db5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala @@ -33,7 +33,7 @@ import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDat import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, lit} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} -import org.junit.jupiter.api.Tag +import org.junit.jupiter.api.{Disabled, Tag} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, ValueSource} @@ -57,6 +57,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { val verificationCol: String = "driver" val updatedVerificationVal: String = "driver_update" + @Disabled("HUDI-3896") @ParameterizedTest @CsvSource(Array( "true,org.apache.hudi.keygen.SimpleKeyGenerator", diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index d8ebe5cbcd..b57cbb09ed 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -30,7 +30,7 @@ import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.keygen.NonpartitionedKeyGenerator import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase} -import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils} +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils, SparkDatasetMixin} import org.apache.log4j.LogManager import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -48,7 +48,7 @@ import scala.collection.JavaConverters._ /** * Tests on Spark DataSource for MOR table. */ -class TestMORDataSource extends HoodieClientTestBase { +class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { var spark: SparkSession = null private val log = LogManager.getLogger(classOf[TestMORDataSource]) @@ -356,7 +356,7 @@ class TestMORDataSource extends HoodieClientTestBase { val hoodieRecords1 = dataGen.generateInserts("001", 100) - val inputDF1 = toDataset(hoodieRecords1) + val inputDF1 = toDataset(spark, hoodieRecords1) inputDF1.write.format("org.apache.hudi") .options(opts) .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same @@ -382,7 +382,7 @@ class TestMORDataSource extends HoodieClientTestBase { // Upsert 50 update records // Snopshot view should read 100 records val records2 = dataGen.generateUniqueUpdates("002", 50) - val inputDF2 = toDataset(records2) + val inputDF2 = toDataset(spark, records2) inputDF2.write.format("org.apache.hudi") .options(opts) .mode(SaveMode.Append) @@ -429,7 +429,7 @@ class TestMORDataSource extends HoodieClientTestBase { verifyShow(hudiIncDF1Skipmerge) val record3 = dataGen.generateUpdatesWithTS("003", hoodieRecords1, -1) - val inputDF3 = toDataset(record3) + val inputDF3 = toDataset(spark, record3) inputDF3.write.format("org.apache.hudi").options(opts) .mode(SaveMode.Append).save(basePath) @@ -443,16 +443,6 @@ class TestMORDataSource extends HoodieClientTestBase { assertEquals(0, hudiSnapshotDF3.filter("rider = 'rider-003'").count()) } - private def toDataset(records: util.List[HoodieRecord[_]]) = { - val avroRecords = records.map(_.getData - .asInstanceOf[HoodieRecordPayload[_]] - .getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA) - .get - .asInstanceOf[GenericRecord]) - val rdd: RDD[GenericRecord] = spark.sparkContext.parallelize(avroRecords, 2) - AvroConversionUtils.createDataFrame(rdd, HoodieTestDataGenerator.AVRO_SCHEMA.toString, spark) - } - @Test def testVectorizedReader() { spark.conf.set("spark.sql.parquet.enableVectorizedReader", true) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala index 2cdd7880bf..f670450c3e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala @@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.{Dataset, HoodieUnsafeRDDUtils, Row, SaveMode} import org.junit.jupiter.api.Assertions.{assertEquals, fail} -import org.junit.jupiter.api.{Tag, Test} +import org.junit.jupiter.api.{Disabled, Tag, Test} import scala.collection.JavaConverters._ @@ -53,6 +53,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getName ) + @Disabled("HUDI-3896") @Test def testBaseFileOnlyViewRelation(): Unit = { val tablePath = s"$basePath/cow" @@ -129,7 +130,8 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with fail("Only Spark 3 and Spark 2 are currently supported") // Test MOR / Read Optimized - runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStatsReadOptimized) + // TODO(HUDI-3896) re-enable + //runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStatsReadOptimized) } @Test @@ -184,7 +186,8 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with fail("Only Spark 3 and Spark 2 are currently supported") // Test MOR / Read Optimized - runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStatsReadOptimized) + // TODO(HUDI-3896) re-enable + //runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStatsReadOptimized) } @Test
