yihua commented on code in PR #14198:
URL: https://github.com/apache/hudi/pull/14198#discussion_r2496100013
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/TestIncrementalQueryWithArchivedInstants.scala:
##########
@@ -34,7 +34,7 @@ import org.junit.jupiter.api.Assertions.{assertDoesNotThrow,
assertFalse}
import org.junit.jupiter.api.function.Executable
@Tag("functional")
-class TestIncrementalQueryWithArchivedInstants extends
SparkClientFunctionalTestHarness {
+class TestIncrementalQueryWithArchivedInstants extends
SparkClientFunctionalTestHarnessScala {
Review Comment:
No need to update the class if there is no usage of `withSQLConf` (same for
a few other classes)?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -251,36 +247,39 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase
{
isMetadataEnabledOnWrite: Boolean, count: Int): Unit = {
inputDf.select("timestamp", "_row_key", "rider", "driver", "begin_lat",
"begin_lon", "end_lat", "end_lon", "fare",
"_hoodie_is_deleted",
"partition_path").createOrReplaceTempView("insert_temp_table")
- try {
- spark.sql(s"set
hoodie.metadata.enable=${String.valueOf(isMetadataEnabledOnWrite)}")
- if (writeOp.equals(UPSERT)) {
- spark.sql("set hoodie.sql.bulk.insert.enable=false")
- spark.sql("set hoodie.sql.insert.mode=upsert")
- spark.sql(
- s"""
- | merge into $tableName as target
- | using insert_temp_table as source
- | on target._row_key = source._row_key and
- | target.partition_path = source.partition_path
- | when matched then update set *
- | when not matched then insert *
- | """.stripMargin)
- } else if (writeOp.equals(BULK_INSERT)) {
- spark.sql("set hoodie.sql.bulk.insert.enable=true")
- spark.sql("set hoodie.sql.insert.mode=non-strict")
- spark.sql(s"insert into $tableName select * from insert_temp_table")
- } else if (writeOp.equals(INSERT)) {
- spark.sql("set hoodie.sql.bulk.insert.enable=false")
- spark.sql("set hoodie.sql.insert.mode=non-strict")
- spark.sql(s"insert into $tableName select * from insert_temp_table")
+ withSQLConf(HoodieMetadataConfig.ENABLE.key() ->
isMetadataEnabledOnWrite.toString) {
+ if (writeOp.equals(UPSERT)) {
+ withSQLConf(
+ "hoodie.sql.bulk.insert.enable" -> "false",
+ "hoodie.sql.insert.mode" -> "upsert"
+ ) {
+ spark.sql(
+ s"""
+ | merge into $tableName as target
+ | using insert_temp_table as source
+ | on target._row_key = source._row_key and
+ | target.partition_path = source.partition_path
+ | when matched then update set *
+ | when not matched then insert *
+ | """.stripMargin)
+ }
+ } else if (writeOp.equals(BULK_INSERT)) {
+ withSQLConf(
+ "hoodie.sql.bulk.insert.enable" -> "true",
+ "hoodie.sql.insert.mode" -> "non-strict"
+ ) {
+ spark.sql(s"insert into $tableName select * from
insert_temp_table")
+ }
+ } else if (writeOp.equals(INSERT)) {
+ withSQLConf(
+ "hoodie.sql.bulk.insert.enable" -> "false",
+ "hoodie.sql.insert.mode" -> "non-strict"
+ ) {
+ spark.sql(s"insert into $tableName select * from
insert_temp_table")
+ }
+ }
Review Comment:
Add `HoodieMetadataConfig.ENABLE.key() -> isMetadataEnabledOnWrite.toString`
to `if` and `else if` branches to reduce one level of nesting?
##########
hudi-client/hudi-spark-client/src/test/scala/org/apache/hudi/testutils/SparkClientFunctionalTestHarnessScala.scala:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.testutils
+
+class SparkClientFunctionalTestHarnessScala extends
SparkClientFunctionalTestHarness {
+
+ /**
+ * Please use this method to set SQL conf in a block and restore them after
the block.
+ * WARN: Please don't set the SQL conf like `spark.sql("set xxx = yyy")`,
replace it with this method.
+ */
+ protected def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
+ val conf = spark.sessionState.conf
+ val currentValues = pairs.unzip._1.map { k =>
+ if (conf.contains(k)) {
+ Some(conf.getConfString(k))
+ } else None
+ }
+ pairs.foreach { case (k, v) => conf.setConfString(k, v) }
+ try f finally {
+ pairs.unzip._1.zip(currentValues).foreach {
+ case (key, Some(value)) => conf.setConfString(key, value)
+ case (key, None) => conf.unsetConf(key)
+ }
+ }
+ }
Review Comment:
`HoodieSparkSqlTestBase` has the method `withSQLConf` with the same
functionality. Could that be consolidated with
`SparkClientFunctionalTestHarnessScala.withSQLConf` so there is no duplicate
code?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]