This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 62a9279d666 [MINOR] Pass prepped boolean correctly in sql writer
(#9320)
62a9279d666 is described below
commit 62a9279d666646fd7abe1872857ea2f94fdedd46
Author: Sagar Sumit <[email protected]>
AuthorDate: Thu Aug 3 08:22:59 2023 +0530
[MINOR] Pass prepped boolean correctly in sql writer (#9320)
---
.../scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 3 +--
.../sql/hudi/command/MergeIntoHoodieTableCommand.scala | 16 ++++++++--------
.../hudi/TestMergeIntoTableWithNonRecordKeyField.scala | 3 ---
3 files changed, 9 insertions(+), 13 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index fcee3fdab49..07b16e1e47d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -404,8 +404,7 @@ object HoodieSparkSqlWriter {
hoodieRecords
}
client.startCommitWithTime(instantTime, commitActionType)
- val writeResult = DataSourceUtils.doWriteOperation(client,
dedupedHoodieRecords, instantTime, operation,
- isPrepped)
+ val writeResult = DataSourceUtils.doWriteOperation(client,
dedupedHoodieRecords, instantTime, operation, isPrepped)
(writeResult, client)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index eba75c95452..f830c552bc8 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -24,8 +24,8 @@ import
org.apache.hudi.HoodieSparkSqlWriter.CANONICALIZE_NULLABLE
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.model.HoodieAvroRecordMerger
import org.apache.hudi.common.util.StringUtils
-import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE,
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME}
import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE,
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
@@ -342,7 +342,9 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
val tableMetaCols = mergeInto.targetTable.output.filter(a =>
isMetaField(a.name))
val joinData =
sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable,
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
val incomingDataCols =
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
- val projectedJoinPlan = if
(sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(),
SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
+ // for pkless table, we need to project the meta columns
+ val hasPrimaryKey =
hoodieCatalogTable.tableConfig.getRecordKeyFields.isPresent
+ val projectedJoinPlan = if (!hasPrimaryKey ||
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(),
"false") == "true") {
Project(tableMetaCols ++ incomingDataCols, joinData)
} else {
Project(incomingDataCols, joinData)
@@ -619,12 +621,10 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// default value ("ts")
// TODO(HUDI-3456) clean up
val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("")
-
val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable,
tableConfig)
-
- val enableOptimizedMerge =
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(),
- SPARK_SQL_OPTIMIZED_WRITES.defaultValue())
-
+ // for pkless tables, we need to enable optimized merge
+ val hasPrimaryKey = tableConfig.getRecordKeyFields.isPresent
+ val enableOptimizedMerge = if (!hasPrimaryKey) "true" else
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(),
"false")
val keyGeneratorClassName = if (enableOptimizedMerge == "true") {
classOf[MergeIntoKeyGenerator].getCanonicalName
} else {
@@ -653,7 +653,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
RECORD_MERGER_IMPLS.key -> classOf[HoodieAvroRecordMerger].getName,
- // NOTE: We have to explicitly override following configs to make sure
no schema validation is performed
+ // NOTE: We have to explicitly override following configs to make sure
no schema validation is performed
// as schema of the incoming dataset might be diverging from the
table's schema (full schemas'
// compatibility b/w table's schema and incoming one is not
necessary in this case since we can
// be cherry-picking only selected columns from the incoming
dataset to be inserted/updated in the
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
index dd1d00580dc..48964b37323 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
@@ -172,7 +172,6 @@ class TestMergeIntoTableWithNonRecordKeyField extends
HoodieSparkSqlTestBase wit
| (6, 'a6', 60, 100)
|""".stripMargin)
- // First merge with a extra input field 'flag' (insert a new record)
spark.sql(
s"""
| merge into $tableName
@@ -219,7 +218,6 @@ class TestMergeIntoTableWithNonRecordKeyField extends
HoodieSparkSqlTestBase wit
for (withPrecombine <- Seq(true, false)) {
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
- spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
val tableName = generateTableName
val prekstr = if (withPrecombine) "tblproperties (preCombineField =
'ts')" else ""
@@ -242,7 +240,6 @@ class TestMergeIntoTableWithNonRecordKeyField extends
HoodieSparkSqlTestBase wit
| (1, 'a1', 10, 100)
|""".stripMargin)
- // First merge with a extra input field 'flag' (insert a new record)
spark.sql(
s"""
| merge into $tableName