This is an automated email from the ASF dual-hosted git repository.

jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dad9139653 [HUDI-8197] Get rid of set sqlconf in Filegroupreader 
parquet file format (#11928)
7dad9139653 is described below

commit 7dad9139653472126d5ba99daf0c20abfd4d4d85
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Sep 12 15:25:37 2024 -0400

    [HUDI-8197] Get rid of set sqlconf in Filegroupreader parquet file format 
(#11928)
    
    Get rid of set sqlconf because it has side effects
    Also use fgreader for all the schema evolution ds tests
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala  |  1 -
 .../org/apache/hudi/TestAvroSchemaResolutionSupport.scala      |  6 +++++-
 .../TestHoodieDeltaStreamerSchemaEvolutionBase.java            | 10 +++-------
 .../TestHoodieDeltaStreamerSchemaEvolutionExtensive.java       |  6 +++---
 .../TestHoodieDeltaStreamerSchemaEvolutionQuick.java           |  2 +-
 5 files changed, 12 insertions(+), 13 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 197af83d4e5..6422d39e976 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -87,7 +87,6 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: 
HoodieTableState,
       supportBatchCalled = true
       supportBatchResult = !isMOR && !isIncremental && !isBootstrap && 
super.supportBatch(sparkSession, schema)
     }
-    sparkSession.conf.set(PARQUET_VECTORIZED_READER_ENABLED.key, 
supportBatchResult)
     supportBatchResult
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
index 7cc76bea198..06bb8c23d22 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
@@ -875,12 +875,16 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
 
     // after implicit type change, read the table with vectorized read enabled
     //fg reader with mor does not support vectorized currently and will auto 
read by row
-    if (HoodieSparkUtils.gteqSpark3_3 && !useFileGroupReader) {
+    if (HoodieSparkUtils.gteqSpark3_3 && (isCow || !useFileGroupReader)) {
       assertThrows(classOf[SparkException]){
         withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> 
"true") {
           readTable(tempRecordPath, useFileGroupReader)
         }
       }
+    } else {
+      withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> 
"true") {
+        readTable(tempRecordPath, useFileGroupReader)
+      }
     }
 
     withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> 
"false") {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index 945f64ece6e..bc26578f8df 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -25,7 +25,6 @@ import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.TestHoodieSparkUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -103,12 +102,6 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase 
extends HoodieDeltaStrea
   protected boolean useTransformer;
   protected boolean userProvidedSchema;
 
-  protected Map<String, String> readOpts = new HashMap<String, String>() {
-    {
-      put(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false");
-    }
-  };
-
   @BeforeAll
   public static void initKafka() {
     defaultSchemaProviderClassName = TestSchemaProvider.class.getName();
@@ -125,6 +118,9 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase 
extends HoodieDeltaStrea
     sourceSchemaFile = "";
     targetSchemaFile = "";
     topicName = "topic" + testNum;
+    if (HoodieSparkUtils.gteqSpark3_3()) {
+      
sparkSession.conf().set("spark.sql.parquet.enableNestedColumnVectorizedReader", 
"false");
+    }
   }
 
   @AfterEach
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
index d152acae2a2..b9925dfbf3d 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
@@ -170,7 +170,7 @@ public class 
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
     }
     assertRecordCount(numRecords);
 
-    Dataset<Row> df = 
sparkSession.read().format("hudi").options(readOpts).load(tableBasePath);
+    Dataset<Row> df = sparkSession.read().format("hudi").load(tableBasePath);
     df.show(9,false);
     df.select(updateColumn).show(9);
     for (String condition : conditions.keySet()) {
@@ -442,7 +442,7 @@ public class 
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
   protected String typePromoUpdates;
 
   protected void assertDataType(String colName, DataType expectedType) {
-    assertEquals(expectedType, 
sparkSession.read().format("hudi").options(readOpts).load(tableBasePath).select(colName).schema().fields()[0].dataType());
+    assertEquals(expectedType, 
sparkSession.read().format("hudi").load(tableBasePath).select(colName).schema().fields()[0].dataType());
   }
 
   protected void testTypePromotionBase(String colName, DataType startType, 
DataType updateType) throws Exception {
@@ -499,7 +499,7 @@ public class 
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
       assertFileNumber(numFiles, false);
     }
     assertRecordCount(numRecords);
-    
sparkSession.read().format("hudi").options(readOpts).load(tableBasePath).select(colName).show(9);
+    
sparkSession.read().format("hudi").load(tableBasePath).select(colName).show(9);
     assertDataType(colName, endType);
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
index a108dec74b1..652ed0f5e5d 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
@@ -241,7 +241,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
     }
     assertRecordCount(numRecords);
 
-    df = 
sparkSession.read().format("hudi").options(readOpts).load(tableBasePath);
+    df = sparkSession.read().format("hudi").load(tableBasePath);
     df.show(100,false);
     df.cache();
     assertDataType(df, "tip_history", 
DataTypes.createArrayType(DataTypes.LongType));

Reply via email to