This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0ca0999420c [HUDI-6464] Spark SQL Merge Into for pkless tables (#9083)
0ca0999420c is described below
commit 0ca0999420ca36b777b23fc9cf3998ce8f31481f
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Jul 6 23:16:51 2023 -0400
[HUDI-6464] Spark SQL Merge Into for pkless tables (#9083)
Tables with a primary key now must join on all primary key columns.
Additionally, they can join on the partition path columns as well, which is
recommended if the table is not using a global index.
Tables without a primary key can join on any columns in the table. If
multiple source table columns match a single target table column, precombine
field will be used if set; otherwise, behavior is nondeterminate. To improve
performance, the hudi meta cols are retained after the join, so that index
lookup and keygeneration can be skipped.
---
.../apache/hudi/config/HoodieInternalConfig.java | 8 +
.../hudi/index/HoodieInternalProxyIndex.java | 71 +++++
.../apache/hudi/index/SparkHoodieIndexFactory.java | 6 +
.../factory/HoodieSparkKeyGeneratorFactory.java | 6 +-
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 13 +-
.../main/scala/org/apache/hudi/DefaultSource.scala | 5 +-
.../org/apache/hudi/HoodieCreateRecordUtils.scala | 72 +++--
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 24 +-
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 4 +-
.../sql/hudi/command/MergeIntoKeyGenerator.scala | 94 ++++++
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 43 ++-
.../hudi/command/MergeIntoHoodieTableCommand.scala | 168 ++++++----
.../apache/spark/sql/hudi/TestMergeIntoTable.scala | 12 +-
.../spark/sql/hudi/TestMergeIntoTable2.scala | 13 +-
.../spark/sql/hudi/TestMergeIntoTable3.scala | 353 +++++++++++++++++++++
.../apache/spark/sql/adapter/Spark2Adapter.scala | 7 +-
.../catalyst/analysis/HoodieSpark2Analysis.scala | 3 +-
.../spark/sql/adapter/BaseSpark3Adapter.scala | 6 +
.../catalyst/analysis/HoodieSpark30Analysis.scala} | 29 +-
.../catalyst/analysis/HoodieSpark31Analysis.scala} | 29 +-
.../apache/spark/sql/adapter/Spark3_2Adapter.scala | 8 +-
.../hudi/analysis/HoodieSpark32PlusAnalysis.scala | 133 +++++++-
.../apache/spark/sql/adapter/Spark3_3Adapter.scala | 8 +-
.../apache/spark/sql/adapter/Spark3_4Adapter.scala | 12 +-
24 files changed, 966 insertions(+), 161 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
index 797df196441..a1c575c4f41 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
@@ -46,6 +46,14 @@ public class HoodieInternalConfig extends HoodieConfig {
.withDocumentation("For SQL operations, if enables bulk_insert
operation, "
+ "this configure will take effect to decide overwrite whole table
or partitions specified");
+ public static final ConfigProperty<Boolean> SQL_MERGE_INTO_WRITES =
ConfigProperty
+ .key("hoodie.internal.sql.merge.into.writes")
+ .defaultValue(false)
+ .markAdvanced()
+ .sinceVersion("0.14.0")
+ .withDocumentation("This internal config is used by Merge Into SQL logic
only to mark such use case "
+ + "and let the core components know it should handle the write
differently");
+
/**
* Returns if partition records are sorted or not.
*
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieInternalProxyIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieInternalProxyIndex.java
new file mode 100644
index 00000000000..a0d53f6c2ed
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieInternalProxyIndex.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+public class HoodieInternalProxyIndex extends HoodieIndex<Object, Object> {
+
+ /**
+ * Index that does not do tagging. Its purpose is to be used for Spark sql
Merge into command
+ * Merge into does not need to use index lookup because we get the location
from the meta columns
+ * from the join. The reason why we can't use the prepped path is because we
sometimes need to do
+ * a mix of updates and inserts and prepped only handles existing records
+ */
+ public HoodieInternalProxyIndex(HoodieWriteConfig config) {
+ super(config);
+ }
+
+ @Override
+ public <R> HoodieData<HoodieRecord<R>>
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
+ return records;
+ }
+
+ @Override
+ public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus>
writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws
HoodieIndexException {
+ return writeStatuses;
+ }
+
+ @Override
+ public boolean rollbackCommit(String instantTime) {
+ return true;
+ }
+
+ @Override
+ public boolean isGlobal() {
+ return false;
+ }
+
+ @Override
+ public boolean canIndexLogFiles() {
+ return false;
+ }
+
+ @Override
+ public boolean isImplicitWithStorage() {
+ return false;
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
index 53218709259..c908f40e4f0 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
@@ -21,6 +21,7 @@ package org.apache.hudi.index;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieInternalConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
@@ -43,6 +44,11 @@ import java.io.IOException;
*/
public final class SparkHoodieIndexFactory {
public static HoodieIndex createIndex(HoodieWriteConfig config) {
+ boolean mergeIntoWrites =
config.getProps().getBoolean(HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key(),
+ HoodieInternalConfig.SQL_MERGE_INTO_WRITES.defaultValue());
+ if (mergeIntoWrites) {
+ return new HoodieInternalProxyIndex(config);
+ }
// first use index class config to create index.
if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {
return HoodieIndexUtils.createUserDefinedIndex(config);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
index ae961b4dcf5..235bd63b99f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
@@ -45,6 +45,7 @@ import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
+import static
org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES;
import static org.apache.hudi.config.HoodieWriteConfig.KEYGENERATOR_TYPE;
import static org.apache.hudi.keygen.KeyGenUtils.inferKeyGeneratorType;
@@ -76,7 +77,10 @@ public class HoodieSparkKeyGeneratorFactory {
public static KeyGenerator createKeyGenerator(TypedProperties props) throws
IOException {
String keyGeneratorClass = getKeyGeneratorClassName(props);
- boolean autoRecordKeyGen = KeyGenUtils.enableAutoGenerateRecordKeys(props);
+ boolean autoRecordKeyGen = KeyGenUtils.enableAutoGenerateRecordKeys(props)
+ //Need to prevent overwriting the keygen for spark sql merge into
because we need to extract
+ //the recordkey from the meta cols if it exists. Sql keygen will use
pkless keygen if needed.
+ && !props.getBoolean(SQL_MERGE_INTO_WRITES.key(),
SQL_MERGE_INTO_WRITES.defaultValue());
try {
KeyGenerator keyGenerator = (KeyGenerator)
ReflectionUtils.loadClass(keyGeneratorClass, props);
if (autoRecordKeyGen) {
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 073b5916ef3..1a72228f99c 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -26,10 +26,11 @@ import org.apache.spark.sql._
import org.apache.spark.sql.avro.{HoodieAvroDeserializer,
HoodieAvroSchemaConverters, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.{Command, Join, LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources._
@@ -202,4 +203,12 @@ trait SparkAdapter extends Serializable {
* Converts instance of [[StorageLevel]] to a corresponding string
*/
def convertStorageLevelToString(level: StorageLevel): String
+
+ /**
+ * Calls fail analysis on
+ *
+ */
+ def failAnalysisForMIT(a: Attribute, cols: String): Unit = {}
+
+ def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType,
condition: Option[Expression], hint: String): LogicalPlan
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 2ceaa1606fd..1b961ba411a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -29,6 +29,8 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.ConfigUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
+import org.apache.hudi.config.HoodieInternalConfig
+import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES
import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.util.PathUtils
@@ -144,7 +146,8 @@ class DefaultSource extends RelationProvider
mode: SaveMode,
optParams: Map[String, String],
rawDf: DataFrame): BaseRelation = {
- val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY, "false")
+ val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY,
+ optParams.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
SQL_MERGE_INTO_WRITES.defaultValue().toString))
.equalsIgnoreCase("true")) {
rawDf // Don't remove meta columns for prepped write.
} else {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
index f55027ee1c2..bd90563abfd 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
@@ -26,6 +26,7 @@ import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieKey, HoodieRecord,
HoodieRecordLocation, HoodieSparkRecord, WriteOperationType}
import
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS
+import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
@@ -46,16 +47,31 @@ import scala.collection.JavaConversions.mapAsJavaMap
object HoodieCreateRecordUtils {
private val log = LoggerFactory.getLogger(getClass)
- def createHoodieRecordRdd(df: DataFrame,
- config: HoodieWriteConfig,
- parameters: Map[String, String],
- recordName: String,
- recordNameSpace: String,
- writerSchema: Schema,
- dataFileSchema: Schema,
- operation: WriteOperationType,
- instantTime: String,
- isPrepped: Boolean) = {
+ case class createHoodieRecordRddArgs(df: DataFrame,
+ config: HoodieWriteConfig,
+ parameters: Map[String, String],
+ recordName: String,
+ recordNameSpace: String,
+ writerSchema: Schema,
+ dataFileSchema: Schema,
+ operation: WriteOperationType,
+ instantTime: String,
+ isPrepped: Boolean,
+ mergeIntoWrites: Boolean)
+
+ def createHoodieRecordRdd(args: createHoodieRecordRddArgs) = {
+ val df = args.df
+ val config = args.config
+ val parameters = args.parameters
+ val recordName = args.recordName
+ val recordNameSpace = args.recordNameSpace
+ val writerSchema = args.writerSchema
+ val dataFileSchema = args.dataFileSchema
+ val operation = args.operation
+ val instantTime = args.instantTime
+ val isPrepped = args.isPrepped
+ val mergeIntoWrites = args.mergeIntoWrites
+
val shouldDropPartitionColumns =
config.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
val recordType = config.getRecordMerger.getRecordType
val autoGenerateRecordKeys: Boolean =
!parameters.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
@@ -112,8 +128,8 @@ object HoodieCreateRecordUtils {
}
val (hoodieKey: HoodieKey, recordLocation:
Option[HoodieRecordLocation]) =
HoodieCreateRecordUtils.getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator,
avroRec,
- isPrepped)
- val avroRecWithoutMeta: GenericRecord = if (isPrepped) {
+ isPrepped, mergeIntoWrites)
+ val avroRecWithoutMeta: GenericRecord = if (isPrepped ||
mergeIntoWrites) {
HoodieAvroUtils.rewriteRecord(avroRec,
HoodieAvroUtils.removeMetadataFields(dataFileSchema))
} else {
avroRec
@@ -170,7 +186,7 @@ object HoodieCreateRecordUtils {
// Do validation only once.
validatePreppedRecord = false
}
- val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation])
=
HoodieCreateRecordUtils.getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator,
sourceRow, sourceStructType, isPrepped)
+ val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation])
=
HoodieCreateRecordUtils.getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator,
sourceRow, sourceStructType, isPrepped, mergeIntoWrites)
val targetRow = finalStructTypeRowWriter(sourceRow)
var hoodieSparkRecord = new HoodieSparkRecord(key, targetRow,
dataFileStructType, false)
@@ -206,27 +222,29 @@ object HoodieCreateRecordUtils {
}
def getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator:
Option[BaseKeyGenerator], avroRec: GenericRecord,
- isPrepped: Boolean):
(HoodieKey, Option[HoodieRecordLocation]) = {
+ isPrepped: Boolean,
mergeIntoWrites: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
+ //use keygen for mergeIntoWrites recordKey and partitionPath because the
keygenerator handles
+ //fetching from the meta fields if they are populated and otherwise doing
keygen
val recordKey = if (isPrepped) {
avroRec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
} else {
keyGenerator.get.getRecordKey(avroRec)
- };
+ }
val partitionPath = if (isPrepped) {
avroRec.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString
} else {
keyGenerator.get.getPartitionPath(avroRec)
- };
+ }
val hoodieKey = new HoodieKey(recordKey, partitionPath)
- val instantTime: Option[String] = if (isPrepped) {
+ val instantTime: Option[String] = if (isPrepped || mergeIntoWrites) {
Option(avroRec.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).map(_.toString)
}
else {
None
}
- val fileName: Option[String] = if (isPrepped) {
+ val fileName: Option[String] = if (isPrepped || mergeIntoWrites) {
Option(avroRec.get(HoodieRecord.FILENAME_METADATA_FIELD)).map(_.toString)
}
else {
@@ -243,27 +261,29 @@ object HoodieCreateRecordUtils {
def getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator:
Option[SparkKeyGeneratorInterface],
sourceRow: InternalRow,
schema: StructType,
- isPrepped: Boolean):
(HoodieKey, Option[HoodieRecordLocation]) = {
+ isPrepped: Boolean,
mergeIntoWrites: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
+ //use keygen for mergeIntoWrites recordKey and partitionPath because the
keygenerator handles
+ //fetching from the meta fields if they are populated and otherwise doing
keygen
val recordKey = if (isPrepped) {
-
sourceRow.getString(HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ sourceRow.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD)
} else {
sparkKeyGenerator.get.getRecordKey(sourceRow, schema).toString
}
val partitionPath = if (isPrepped) {
-
sourceRow.getString(HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD))
+ sourceRow.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD)
} else {
sparkKeyGenerator.get.getPartitionPath(sourceRow, schema).toString
- };
+ }
- val instantTime: Option[String] = if (isPrepped) {
-
Option(sourceRow.getString(HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)))
+ val instantTime: Option[String] = if (isPrepped || mergeIntoWrites) {
+ Option(sourceRow.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD))
} else {
None
}
- val fileName: Option[String] = if (isPrepped) {
-
Option(sourceRow.getString(HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD)))
+ val fileName: Option[String] = if (isPrepped || mergeIntoWrites) {
+ Option(sourceRow.getString(HoodieRecord.FILENAME_META_FIELD_ORD))
} else {
None
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 2c0e95eabf2..39a8d94c9b5 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -45,6 +45,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig,
HoodieTableMetaClient, T
import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option =>
HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH,
INDEX_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
+import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES
import org.apache.hudi.exception.{HoodieException,
SchemaCompatibilityException}
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
import org.apache.hudi.internal.schema.InternalSchema
@@ -88,14 +89,6 @@ object HoodieSparkSqlWriter {
ConfigProperty.key("hoodie.internal.write.schema.canonicalize.nullable")
.defaultValue(true)
- /**
- * For merge into from spark-sql, we need some special handling. for eg,
schema validation should be disabled
- * for writes from merge into. This config is used for internal purposes.
- */
- val SQL_MERGE_INTO_WRITES: ConfigProperty[Boolean] =
- ConfigProperty.key("hoodie.internal.sql.merge.into.writes")
- .defaultValue(false)
-
/**
* For spark streaming use-cases, holds the batch Id.
*/
@@ -337,10 +330,12 @@ object HoodieSparkSqlWriter {
// Remove meta columns from writerSchema if isPrepped is true.
val isPrepped =
hoodieConfig.getBooleanOrDefault(DATASOURCE_WRITE_PREPPED_KEY, false)
- val processedDataSchema = if (isPrepped) {
- HoodieAvroUtils.removeMetadataFields(writerSchema);
+ val mergeIntoWrites =
parameters.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
+ SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
+ val processedDataSchema = if (isPrepped || mergeIntoWrites) {
+ HoodieAvroUtils.removeMetadataFields(dataFileSchema)
} else {
- dataFileSchema;
+ dataFileSchema
}
// Create a HoodieWriteClient & issue the write.
@@ -372,8 +367,9 @@ object HoodieSparkSqlWriter {
}
// Convert to RDD[HoodieRecord]
val hoodieRecords =
- HoodieCreateRecordUtils.createHoodieRecordRdd(df, writeConfig,
parameters, avroRecordName, avroRecordNamespace, writerSchema,
- processedDataSchema, operation, instantTime, isPrepped)
+
HoodieCreateRecordUtils.createHoodieRecordRdd(HoodieCreateRecordUtils.createHoodieRecordRddArgs(df,
+ writeConfig, parameters, avroRecordName, avroRecordNamespace,
writerSchema,
+ processedDataSchema, operation, instantTime, isPrepped,
mergeIntoWrites))
val dedupedHoodieRecords =
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
@@ -438,7 +434,7 @@ object HoodieSparkSqlWriter {
// in the table's one we want to proceed aligning nullability
constraints w/ the table's schema
val shouldCanonicalizeNullable =
opts.getOrDefault(CANONICALIZE_NULLABLE.key,
CANONICALIZE_NULLABLE.defaultValue.toString).toBoolean
- val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
+ val mergeIntoWrites =
opts.getOrDefault(HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key(),
SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
val canonicalizedSourceSchema = if (shouldCanonicalizeNullable) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 09a6d873e8b..405e761635a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -28,7 +28,7 @@ import org.apache.hudi.keygen.{NonpartitionedKeyGenerator,
SimpleKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.util.SparkKeyGenUtils
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.hudi.command.SqlKeyGenerator
+import org.apache.spark.sql.hudi.command.{MergeIntoKeyGenerator,
SqlKeyGenerator}
import java.util.Properties
import scala.collection.JavaConversions.mapAsJavaMap
@@ -115,7 +115,7 @@ object HoodieWriterUtils {
def getOriginKeyGenerator(parameters: Map[String, String]): String = {
val kg = parameters.getOrElse(KEYGENERATOR_CLASS_NAME.key(), null)
- if (classOf[SqlKeyGenerator].getCanonicalName == kg) {
+ if (classOf[SqlKeyGenerator].getCanonicalName == kg ||
classOf[MergeIntoKeyGenerator].getCanonicalName == kg) {
parameters.getOrElse(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME, null)
} else {
kg
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoKeyGenerator.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoKeyGenerator.scala
new file mode 100644
index 00000000000..4399b25727b
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoKeyGenerator.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.common.config.TypedProperties
+import
org.apache.hudi.common.model.HoodieRecord.{PARTITION_PATH_META_FIELD_ORD,
RECORD_KEY_META_FIELD_ORD}
+import org.apache.hudi.common.util.StringUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * NOTE TO USERS: YOU SHOULD NOT SET THIS AS YOUR KEYGENERATOR
+ *
+ * Keygenerator that is meant to be used internally for the spark sql merge
into command
+ * It will attempt to get the partition path and recordkey from the
metafields, but will
+ * fallback to the sql keygenerator if the meta field is not populated
+ *
+ */
+class MergeIntoKeyGenerator(props: TypedProperties) extends
SqlKeyGenerator(props) {
+
+ override def getRecordKey(record: GenericRecord): String = {
+ val recordKey = record.get(RECORD_KEY_META_FIELD_ORD)
+ if (recordKey != null) {
+ recordKey.toString
+ } else {
+ super.getRecordKey(record)
+ }
+ }
+
+ override def getRecordKey(row: Row): String = {
+ val recordKey = row.getString(RECORD_KEY_META_FIELD_ORD)
+ if (!StringUtils.isNullOrEmpty(recordKey)) {
+ recordKey
+ } else {
+ super.getRecordKey(row)
+ }
+ }
+
+ override def getRecordKey(internalRow: InternalRow, schema: StructType):
UTF8String = {
+ val recordKey = internalRow.getUTF8String(RECORD_KEY_META_FIELD_ORD)
+ if (recordKey != null) {
+ recordKey
+ } else {
+ super.getRecordKey(internalRow, schema)
+ }
+ }
+
+ override def getPartitionPath(record: GenericRecord): String = {
+ val partitionPath = record.get(PARTITION_PATH_META_FIELD_ORD)
+ if (partitionPath != null) {
+ partitionPath.toString
+ } else {
+ super.getPartitionPath(record)
+ }
+ }
+
+ override def getPartitionPath(row: Row): String = {
+ val partitionPath = row.getString(PARTITION_PATH_META_FIELD_ORD)
+ if (!StringUtils.isNullOrEmpty(partitionPath)) {
+ partitionPath
+ } else {
+ super.getPartitionPath(row)
+ }
+ }
+
+ override def getPartitionPath(internalRow: InternalRow, schema: StructType):
UTF8String = {
+ val partitionPath =
internalRow.getUTF8String(PARTITION_PATH_META_FIELD_ORD)
+ if (partitionPath != null) {
+ partitionPath
+ } else {
+ super.getPartitionPath(internalRow, schema)
+ }
+ }
+
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index ad302962019..ddc2372b9a3 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -43,23 +43,39 @@ object HoodieAnalysis extends SparkAdapterSupport {
val rules: ListBuffer[RuleBuilder] = ListBuffer()
// NOTE: This rule adjusts [[LogicalRelation]]s resolving into Hudi tables
such that
- // meta-fields are not affecting the resolution of the target
columns to be updated by Spark.
+ // meta-fields are not affecting the resolution of the target
columns to be updated by Spark (Except in the
+ // case of MergeInto. We leave the meta columns on the target table,
and use other means to ensure resolution)
// For more details please check out the scala-doc of the rule
- // TODO limit adapters to only Spark < 3.2
val adaptIngestionTargetLogicalRelations: RuleBuilder = session =>
AdaptIngestionTargetLogicalRelations(session)
- if (HoodieSparkUtils.isSpark2) {
- val spark2ResolveReferencesClass =
"org.apache.spark.sql.catalyst.analysis.HoodieSpark2Analysis$ResolveReferences"
- val spark2ResolveReferences: RuleBuilder =
- session => ReflectionUtils.loadClass(spark2ResolveReferencesClass,
session).asInstanceOf[Rule[LogicalPlan]]
+ if (!HoodieSparkUtils.gteqSpark3_2) {
+ //Add or correct resolution of MergeInto
+ // the way we load the class via reflection is diff across spark2 and
spark3 and hence had to split it out.
+ if (HoodieSparkUtils.isSpark2) {
+ val resolveReferencesClass =
"org.apache.spark.sql.catalyst.analysis.HoodieSpark2Analysis$ResolveReferences"
+ val sparkResolveReferences: RuleBuilder =
+ session => ReflectionUtils.loadClass(resolveReferencesClass,
session).asInstanceOf[Rule[LogicalPlan]]
+ // TODO elaborate on the ordering
+ rules += (adaptIngestionTargetLogicalRelations, sparkResolveReferences)
+ } else if (HoodieSparkUtils.isSpark3_0) {
+ val resolveReferencesClass =
"org.apache.spark.sql.catalyst.analysis.HoodieSpark30Analysis$ResolveReferences"
+ val sparkResolveReferences: RuleBuilder = {
+ session => instantiateKlass(resolveReferencesClass, session)
+ }
+ // TODO elaborate on the ordering
+ rules += (adaptIngestionTargetLogicalRelations, sparkResolveReferences)
+ } else if (HoodieSparkUtils.isSpark3_1) {
+ val resolveReferencesClass =
"org.apache.spark.sql.catalyst.analysis.HoodieSpark31Analysis$ResolveReferences"
+ val sparkResolveReferences: RuleBuilder =
+ session => instantiateKlass(resolveReferencesClass, session)
+ // TODO elaborate on the ordering
+ rules += (adaptIngestionTargetLogicalRelations, sparkResolveReferences)
+ } else {
+ throw new IllegalStateException("Impossible to be here")
+ }
- // TODO elaborate on the ordering
- rules += (adaptIngestionTargetLogicalRelations, spark2ResolveReferences)
} else {
rules += adaptIngestionTargetLogicalRelations
- }
-
- if (HoodieSparkUtils.gteqSpark3_2) {
val dataSourceV2ToV1FallbackClass =
"org.apache.spark.sql.hudi.analysis.HoodieDataSourceV2ToV1Fallback"
val dataSourceV2ToV1Fallback: RuleBuilder =
session => instantiateKlass(dataSourceV2ToV1FallbackClass, session)
@@ -192,9 +208,8 @@ object HoodieAnalysis extends SparkAdapterSupport {
// the data, as such we have to make sure that we handle both
of these cases
case mit@MatchMergeIntoTable(targetTable, query, _) =>
val updatedTargetTable = targetTable match {
- // In the receiving side of the MIT, we can't project meta-field
attributes out,
- // and instead have to explicitly remove them
- case ResolvesToHudiTable(_) =>
Some(stripMetaFieldsAttributes(targetTable))
+ //Do not remove the meta cols here anymore
+ case ResolvesToHudiTable(_) => Some(targetTable)
case _ => None
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 2b0aabacebe..d610758fd8c 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -20,21 +20,23 @@ package org.apache.spark.sql.hudi.command
import org.apache.avro.Schema
import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema
import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.HoodieSparkSqlWriter.{CANONICALIZE_NULLABLE,
SQL_MERGE_INTO_WRITES}
+import org.apache.hudi.HoodieSparkSqlWriter.CANONICALIZE_NULLABLE
+import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.model.HoodieAvroRecordMerger
import org.apache.hudi.common.util.StringUtils
-import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE,
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME}
+import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.util.JFunction.scalaFunction1Noop
-import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions,
HoodieSparkSqlWriter, SparkAdapterSupport}
+import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions,
HoodieSparkSqlWriter, HoodieSparkUtils, SparkAdapterSupport}
import org.apache.spark.sql.HoodieCatalystExpressionUtils.{MatchCast,
attributeEquals}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, BoundReference, EqualTo, Expression, Literal,
NamedExpression, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans.LeftOuter
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
@@ -125,18 +127,68 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
* expression involving [[source]] column(s), we will have to add "phony"
column matching the
* primary-key one of the target table.
*/
- private lazy val primaryKeyAttributeToConditionExpression: Seq[(Attribute,
Expression)] = {
+ private lazy val recordKeyAttributeToConditionExpression: Seq[(Attribute,
Expression)] = {
+ val primaryKeyFields = hoodieCatalogTable.tableConfig.getRecordKeyFields
val conditions = splitConjunctivePredicates(mergeInto.mergeCondition)
- if (!conditions.forall(p => p.isInstanceOf[EqualTo])) {
- throw new AnalysisException(s"Currently only equality predicates are
supported in MERGE INTO statement " +
- s"(provided ${mergeInto.mergeCondition.sql}")
+ if (primaryKeyFields.isPresent) {
+ //pkless tables can have more complex conditions
+ if (!conditions.forall(p => p.isInstanceOf[EqualTo])) {
+ throw new AnalysisException(s"Currently only equality predicates are
supported in MERGE INTO statement on primary key table" +
+ s"(provided ${mergeInto.mergeCondition.sql}")
+ }
}
-
val resolver = sparkSession.sessionState.analyzer.resolver
- val primaryKeyField = hoodieCatalogTable.tableConfig.getRecordKeyFieldProp
+ val partitionPathFields = hoodieCatalogTable.tableConfig.getPartitionFields
+ //ensure all primary key fields are part of the merge condition
+ //allow partition path to be part of the merge condition but not required
+ val targetAttr2ConditionExpressions = doCasting(conditions,
primaryKeyFields.isPresent)
+ val expressionSet = scala.collection.mutable.Set[(Attribute,
Expression)](targetAttr2ConditionExpressions:_*)
+ var partitionAndKeyFields: Seq[(String,String)] = Seq.empty
+ if (primaryKeyFields.isPresent) {
+ partitionAndKeyFields = partitionAndKeyFields ++
primaryKeyFields.get().map(pk => ("primaryKey", pk)).toSeq
+ }
+ if (partitionPathFields.isPresent) {
+ partitionAndKeyFields = partitionAndKeyFields ++
partitionPathFields.get().map(pp => ("partitionPath", pp)).toSeq
+ }
+ val resolvedCols = partitionAndKeyFields.map(rk => {
+ val resolving = expressionSet.collectFirst {
+ case (attr, expr) if resolver(attr.name, rk._2) =>
+ // NOTE: Here we validate that condition expression involving
record-key column(s) is a simple
+ // attribute-reference expression (possibly wrapped into a
cast). This is necessary to disallow
+ // statements like following
+ //
+ // MERGE INTO ... AS t USING (
+ // SELECT ... FROM ... AS s
+ // )
+ // ON t.id = s.id + 1
+ // WHEN MATCHED THEN UPDATE *
+ //
+ // Which (in the current design) could result in a primary key
of the record being modified,
+ // which is not allowed.
+ if (!resolvesToSourceAttribute(expr)) {
+ throw new AnalysisException("Only simple conditions of the form
`t.id = s.id` are allowed on the " +
+ s"primary-key and partition path column. Found `${attr.sql} =
${expr.sql}`")
+ }
+ expressionSet.remove((attr, expr))
+ (attr, expr)
+ }
+ if (resolving.isEmpty && rk._1.equals("primaryKey")) {
+ throw new AnalysisException(s"Hudi tables with primary key are
required to match on all primary key colums. Column: '${rk._2}' not found")
+ }
+ resolving
+ }).filter(_.nonEmpty).map(_.get)
+
+ if (expressionSet.nonEmpty && primaryKeyFields.isPresent) {
+ //if pkless additional expressions are allowed
+ throw new AnalysisException(s"Only simple conditions of the form `t.id =
s.id` using primary key or partition path columns are allowed on tables with
primary key. " +
+ s"(illegal column(s) used: `${expressionSet.map(x =>
x._1.name).mkString("`,`")}`")
+ }
+ resolvedCols
+ }
- val targetAttrs = mergeInto.targetTable.outputSet
+ private def doCasting(conditions: Seq[Expression], pkTable: Boolean):
Seq[(Attribute, Expression)] = {
+ val targetAttrs = mergeInto.targetTable.outputSet
val exprUtils = sparkAdapter.getCatalystExpressionUtils
// Here we're unraveling superfluous casting of expressions on both sides
of the matched-on condition,
// in case both of them are casted to the same type (which might be result
of either explicit casting
@@ -146,7 +198,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// as they are w/o the ability of transforming them w/ custom expressions
(unlike in vanilla Spark flow).
//
// Check out HUDI-4861 for more details
- val cleanedConditions =
conditions.map(_.asInstanceOf[EqualTo]).map(stripCasting)
+ val cleanedConditions = conditions.map(stripCasting)
// Expressions of the following forms are supported:
// `target.id = <expr>` (or `<expr> = target.id`)
@@ -156,7 +208,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// target table side (since we're gonna be matching against primary-key
column as is) expression
// on the opposite side of the comparison should be cast-able to the
primary-key column's data-type
// t/h "up-cast" (ie w/o any loss in precision)
- val targetAttr2ConditionExpressions = cleanedConditions.map {
+ cleanedConditions.collect {
case EqualTo(CoercedAttributeReference(attr), expr) if
targetAttrs.exists(f => attributeEquals(f, attr)) =>
if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
// NOTE: It's critical we reference output attribute here and not
the one from condition
@@ -177,32 +229,10 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
+ s"can't cast ${expr.sql} (of ${expr.dataType}) to
${attr.dataType}")
}
- case expr =>
+ case expr if pkTable =>
throw new AnalysisException(s"Invalid MERGE INTO matching condition:
`${expr.sql}`: "
+ "expected condition should be 'target.id = <source-column-expr>',
e.g. "
- + "`t.id = s.id` or `t.id = cast(s.id, ...)`")
- }
-
- targetAttr2ConditionExpressions.collect {
- case (attr, expr) if resolver(attr.name, primaryKeyField) =>
- // NOTE: Here we validate that condition expression involving
primary-key column(s) is a simple
- // attribute-reference expression (possibly wrapped into a
cast). This is necessary to disallow
- // statements like following
- //
- // MERGE INTO ... AS t USING (
- // SELECT ... FROM ... AS s
- // )
- // ON t.id = s.id + 1
- // WHEN MATCHED THEN UPDATE *
- //
- // Which (in the current design) could result in a primary key
of the record being modified,
- // which is not allowed.
- if (!resolvesToSourceAttribute(expr)) {
- throw new AnalysisException("Only simple conditions of the form
`t.id = s.id` are allowed on the " +
- s"primary-key column. Found `${attr.sql} = ${expr.sql}`")
- }
-
- (attr, expr)
+ + "`t.id = s.id` or `t.id = cast(s.id, ...)")
}
}
@@ -245,11 +275,16 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// TODO move to analysis phase
validate
- val sourceDF: DataFrame = sourceDataset
+ if (HoodieSparkUtils.isSpark2) {
+ //already enabled by default for spark 3+
+ sparkSession.conf.set("spark.sql.crossJoin.enabled","true")
+ }
+
+ val projectedJoinedDF: DataFrame = projectedJoinedDataset
// Create the write parameters
val props = buildMergeIntoConfig(hoodieCatalogTable)
// Do the upsert
- executeUpsert(sourceDF, props)
+ executeUpsert(projectedJoinedDF, props)
// Refresh the table in the catalog
sparkSession.catalog.refreshTable(hoodieCatalogTable.table.qualifiedName)
@@ -298,18 +333,26 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
* <li>{@code ts = source.sts}</li>
* </ul>
*/
- def sourceDataset: DataFrame = {
+ def projectedJoinedDataset: DataFrame = {
val resolver = sparkSession.sessionState.analyzer.resolver
- val sourceTablePlan = mergeInto.sourceTable
- val sourceTableOutput = sourceTablePlan.output
+ // We want to join the source and target tables.
+ // Then we want to project the output so that we have the meta columns
from the target table
+ // followed by the data columns of the source table
+ val tableMetaCols = mergeInto.targetTable.output.filter(a =>
isMetaField(a.name))
+ val joinData = sparkAdapter.createMITJoin(mergeInto.sourceTable,
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
+ val incomingDataCols =
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
+ val projectedJoinPlan = Project(tableMetaCols ++ incomingDataCols,
joinData)
+ val projectedJoinOutput = projectedJoinPlan.output
- val requiredAttributesMap = primaryKeyAttributeToConditionExpression ++
preCombineAttributeAssociatedExpression
+ val requiredAttributesMap = recordKeyAttributeToConditionExpression ++
preCombineAttributeAssociatedExpression
val (existingAttributesMap, missingAttributesMap) =
requiredAttributesMap.partition {
- case (keyAttr, _) => sourceTableOutput.exists(attr =>
resolver(keyAttr.name, attr.name))
+ case (keyAttr, _) => projectedJoinOutput.exists(attr =>
resolver(keyAttr.name, attr.name))
}
+ // This is to handle the situation where condition is something like
"s0.s_id = t0.id" so In the source table
+ // we add an additional column that is an alias of "s0.s_id" named "id"
// NOTE: Primary key attribute (required) as well as Pre-combine one
(optional) defined
// in the [[targetTable]] schema has to be present in the incoming
[[sourceTable]] dataset.
// In cases when [[sourceTable]] doesn't bear such attributes
(which, for ex, could happen
@@ -317,7 +360,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// them according to aforementioned heuristic) to meet Hudi's
requirements
val additionalColumns: Seq[NamedExpression] =
missingAttributesMap.flatMap {
- case (keyAttr, sourceExpression) if !sourceTableOutput.exists(attr =>
resolver(attr.name, keyAttr.name)) =>
+ case (keyAttr, sourceExpression) if !projectedJoinOutput.exists(attr
=> resolver(attr.name, keyAttr.name)) =>
Seq(Alias(sourceExpression, keyAttr.name)())
case _ => Seq()
@@ -327,7 +370,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// matches to that one of the target table. This is necessary b/c unlike
Spark, Avro is case-sensitive
// and therefore would fail downstream if case of corresponding columns
don't match
val existingAttributes = existingAttributesMap.map(_._1)
- val adjustedSourceTableOutput = sourceTableOutput.map { attr =>
+ val adjustedSourceTableOutput = projectedJoinOutput.map { attr =>
existingAttributes.find(keyAttr => resolver(keyAttr.name, attr.name))
match {
// To align the casing we just rename the attribute to match that one
of the
// target table
@@ -336,7 +379,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
}
}
- val amendedPlan = Project(adjustedSourceTableOutput ++ additionalColumns,
sourceTablePlan)
+ val amendedPlan = Project(adjustedSourceTableOutput ++ additionalColumns,
projectedJoinPlan)
Dataset.ofRows(sparkSession, amendedPlan)
}
@@ -348,13 +391,13 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
* expressions to the ExpressionPayload#getInsertValue.
*/
private def executeUpsert(sourceDF: DataFrame, parameters: Map[String,
String]): Unit = {
- val operation = if
(StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, ""))) {
+ val operation = if
(StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, "")) &&
updatingActions.isEmpty) {
INSERT_OPERATION_OPT_VAL
} else {
UPSERT_OPERATION_OPT_VAL
}
- // Append the table schema to the parameters. In the case of merge into,
the schema of sourceDF
+ // Append the table schema to the parameters. In the case of merge into,
the schema of projectedJoinedDF
// may be different from the target table, because the are transform
logical in the update or
// insert actions.
var writeParams = parameters +
@@ -387,7 +430,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// - Schema of the expected "joined" output of the [[sourceTable]] and
[[targetTable]]
writeParams ++= Seq(
PAYLOAD_RECORD_AVRO_SCHEMA ->
- convertStructTypeToAvroSchema(sourceDF.schema, "record", "").toString,
+
HoodieAvroUtils.removeMetadataFields(convertStructTypeToAvroSchema(sourceDF.schema,
"record", "")).toString,
PAYLOAD_EXPECTED_COMBINED_SCHEMA ->
encodeAsBase64String(toStructType(joinedExpectedOutput))
)
@@ -483,7 +526,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
}
case _ =>
throw new AnalysisException(s"Assignment expressions have to
assign every attribute of target table " +
- s"(provided: `${assignments.map(_.sql).mkString(",")}`")
+ s"(provided: `${assignments.map(_.sql).mkString(",")}`)")
}
}
}
@@ -530,7 +573,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// NOTE: We're relying on [[sourceDataset]] here instead of
[[mergeInto.sourceTable]],
// as it could be amended to add missing primary-key and/or
pre-combine columns.
// Please check [[sourceDataset]] scala-doc for more details
- sourceDataset.queryExecution.analyzed.output ++
mergeInto.targetTable.output
+ (projectedJoinedDataset.queryExecution.analyzed.output ++
mergeInto.targetTable.output).filterNot(a => isMetaField(a.name))
}
private def resolvesToSourceAttribute(expr: Expression): Boolean = {
@@ -576,13 +619,13 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
val overridingOpts = Map(
"path" -> path,
- RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
+ RECORDKEY_FIELD.key -> tableConfig.getRawRecordKeyFieldProp,
PRECOMBINE_FIELD.key -> preCombineField,
TBL_NAME.key -> hoodieCatalogTable.tableName,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
HIVE_STYLE_PARTITIONING.key ->
tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
- KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+ KEYGENERATOR_CLASS_NAME.key ->
classOf[MergeIntoKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME ->
tableConfig.getKeyGeneratorClassName,
HoodieSyncConfig.META_SYNC_ENABLED.key ->
hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key ->
hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
@@ -605,7 +648,8 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
RECONCILE_SCHEMA.key -> "false",
CANONICALIZE_NULLABLE.key -> "false",
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true",
- SQL_MERGE_INTO_WRITES.key -> "true"
+ HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key -> "true",
+ HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key() ->
(!StringUtils.isNullOrEmpty(preCombineField)).toString
)
combineOptions(hoodieCatalogTable, tableConfig,
sparkSession.sqlContext.conf,
@@ -628,17 +672,19 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
private def checkInsertingActions(insertActions: Seq[InsertAction]): Unit = {
insertActions.foreach(insert =>
assert(insert.assignments.length <= targetTableSchema.length,
- s"The number of insert assignments[${insert.assignments.length}] must
less than or equal to the " +
+ s"The number of insert assignments[${insert.assignments.length}] must
be less than or equal to the " +
s"targetTable field size[${targetTableSchema.length}]"))
}
private def checkUpdatingActions(updateActions: Seq[UpdateAction]): Unit = {
-
- //updateActions.foreach(update =>
- // assert(update.assignments.length == targetTableSchema.length,
- // s"The number of update assignments[${update.assignments.length}]
must equal to the " +
- // s"targetTable field size[${targetTableSchema.length}]"))
+ if (hoodieCatalogTable.preCombineKey.isEmpty && updateActions.nonEmpty) {
+ logWarning(s"Updates without precombine can have nondeterministic
behavior")
+ }
+ updateActions.foreach(update =>
+ assert(update.assignments.length <= targetTableSchema.length,
+ s"The number of update assignments[${update.assignments.length}] must
be less than or equalequal to the " +
+ s"targetTable field size[${targetTableSchema.length}]"))
// For MOR table, the target table field cannot be the right-value in the
update action.
if (targetTableType == MOR_TABLE_TYPE_OPT_VAL) {
@@ -666,7 +712,7 @@ object MergeIntoHoodieTableCommand {
}
}
- def stripCasting(expr: EqualTo): EqualTo = expr match {
+ def stripCasting(expr: Expression): Expression = expr match {
case EqualTo(MatchCast(leftExpr, leftTargetType, _, _),
MatchCast(rightExpr, rightTargetType, _, _))
if leftTargetType.sameType(rightTargetType) => EqualTo(leftExpr,
rightExpr)
case _ => expr
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index a96fe669b57..7ee3e838a2e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -352,11 +352,11 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
// Delete with condition expression.
val errorMessage = if (HoodieSparkUtils.gteqSpark3_2) {
- "Only simple conditions of the form `t.id = s.id` are allowed on the
primary-key column. Found `t0.id = (s0.s_id + 1)`"
+ "Only simple conditions of the form `t.id = s.id` are allowed on the
primary-key and partition path column. Found `t0.id = (s0.s_id + 1)`"
} else if (HoodieSparkUtils.gteqSpark3_1) {
- "Only simple conditions of the form `t.id = s.id` are allowed on the
primary-key column. Found `t0.`id` = (s0.`s_id` + 1)`"
+ "Only simple conditions of the form `t.id = s.id` are allowed on the
primary-key and partition path column. Found `t0.`id` = (s0.`s_id` + 1)`"
} else {
- "Only simple conditions of the form `t.id = s.id` are allowed on the
primary-key column. Found `t0.`id` = (s0.`s_id` + 1)`;"
+ "Only simple conditions of the form `t.id = s.id` are allowed on the
primary-key and partition path column. Found `t0.`id` = (s0.`s_id` + 1)`;"
}
checkException(
@@ -628,11 +628,11 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
// 1) set source column name to be same as target column
//
val complexConditionsErrorMessage = if (HoodieSparkUtils.gteqSpark3_2) {
- "Only simple conditions of the form `t.id = s.id` are allowed on the
primary-key column. Found `t0.id = (s0.id + 1)`"
+ "Only simple conditions of the form `t.id = s.id` are allowed on the
primary-key and partition path column. Found `t0.id = (s0.id + 1)`"
} else if (HoodieSparkUtils.gteqSpark3_1) {
- "Only simple conditions of the form `t.id = s.id` are allowed on the
primary-key column. Found `t0.`id` = (s0.`id` + 1)`"
+ "Only simple conditions of the form `t.id = s.id` are allowed on the
primary-key and partition path column. Found `t0.`id` = (s0.`id` + 1)`"
} else {
- "Only simple conditions of the form `t.id = s.id` are allowed on the
primary-key column. Found `t0.`id` = (s0.`id` + 1)`;"
+ "Only simple conditions of the form `t.id = s.id` are allowed on the
primary-key and partition path column. Found `t0.`id` = (s0.`id` + 1)`;"
}
checkException(
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
index 48ba4ea0ca0..84f161444b0 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
@@ -373,6 +373,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
})
}
+ /* TODO [HUDI-6472]
test("Test MergeInto When PrimaryKey And PreCombineField Of Source Table And
Target Table Differ In Case Only") {
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
@@ -552,6 +553,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
)
})
}
+*/
test("Test only insert when source table contains history") {
withRecordType()(withTempDir { tmp =>
@@ -634,7 +636,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| select 1 as id1, 1 as id2, 'a1' as name, 11 as price, 110 as ts,
'2022-08-19' as dt union all
| select 1 as id1, 2 as id2, 'a2' as name, 10 as price, 100 as ts,
'2022-08-18' as dt
| ) as s0
- | on t0.id1 = s0.id1
+ | on t0.id1 = s0.id1 and t0.id2 = s0.id2
| when not matched then insert *
""".stripMargin
)
@@ -876,7 +878,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| using (
| select 3 as id, 'a3' as name, 10.3 as price, 1003 as ts,
'2021-03-21' as dt
| union all
- | select 1 as id, 'a2' as name, 10.2 as price, 1002 as ts,
'2021-03-21' as dt
+ | select 1 as id, 'a2' as name, 10.4 as price, 1004 as ts,
'2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when matched then update set *
@@ -884,10 +886,9 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
""".stripMargin
)
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
- Seq(1, "a1", 10.1, 1000, "2021-03-21"),
- Seq(1, "a2", 10.2, 1002, "2021-03-21"),
- Seq(3, "a3", 10.3, 1003, "2021-03-21"),
- Seq(1, "a2", 10.2, 1002, "2021-03-21")
+ Seq(1, "a2", 10.4, 1004, "2021-03-21"),
+ Seq(1, "a2", 10.4, 1004, "2021-03-21"),
+ Seq(3, "a3", 10.3, 1003, "2021-03-21")
)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable3.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable3.scala
new file mode 100644
index 00000000000..075fb332327
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable3.scala
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.{HoodieSparkUtils, ScalaAssertionSupport}
+
+class TestMergeIntoTable3 extends HoodieSparkSqlTestBase with
ScalaAssertionSupport {
+
+ test("Test Merge into extra cond") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey ='id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ val tableName2 = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName2 (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName2'
+ | tblproperties (
+ | primaryKey ='id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+
+ spark.sql(
+ s"""
+ |insert into $tableName values
+ | (1, 'a1', 10, 100),
+ | (2, 'a2', 20, 200),
+ | (3, 'a3', 20, 100)
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into $tableName2 values
+ | (1, 'u1', 10, 999),
+ | (3, 'u3', 30, 9999),
+ | (4, 'u4', 40, 99999)
+ |""".stripMargin)
+
+ spark.sql(
+ s"""
+ |merge into $tableName as oldData
+ |using $tableName2
+ |on oldData.id = $tableName2.id
+ |when matched and oldData.price = $tableName2.price then update set
oldData.name = $tableName2.name
+ |
+ |""".stripMargin)
+
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "u1", 10.0, 100),
+ Seq(3, "a3", 20.0, 100),
+ Seq(2, "a2", 20.0, 200)
+ )
+
+ val errorMessage = if (HoodieSparkUtils.gteqSpark3_1) {
+ "Only simple conditions of the form `t.id = s.id` using primary key or
partition path " +
+ "columns are allowed on tables with primary key. (illegal column(s)
used: `price`"
+ } else {
+ "Only simple conditions of the form `t.id = s.id` using primary key or
partition path " +
+ "columns are allowed on tables with primary key. (illegal column(s)
used: `price`;"
+ }
+
+ checkException(
+ s"""
+ |merge into $tableName as oldData
+ |using $tableName2
+ |on oldData.id = $tableName2.id and oldData.price =
$tableName2.price
+ |when matched then update set oldData.name = $tableName2.name
+ |when not matched then insert *
+ |""".stripMargin)(errorMessage)
+
+ //test with multiple pks
+ val tableName3 = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName3 (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName3'
+ | tblproperties (
+ | primaryKey ='id,name',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+
+ val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) {
+ "Hudi tables with primary key are required to match on all primary key
colums. Column: 'name' not found"
+ } else {
+ "Hudi tables with primary key are required to match on all primary key
colums. Column: 'name' not found;"
+ }
+
+ checkException(
+ s"""
+ |merge into $tableName3 as oldData
+ |using $tableName2
+ |on oldData.id = $tableName2.id
+ |when matched then update set oldData.name = $tableName2.name
+ |when not matched then insert *
+ |""".stripMargin)(errorMessage2)
+ }
+ }
+
+ test("Test pkless complex merge cond") {
+ withRecordType()(withTempDir { tmp =>
+ spark.sql("set hoodie.payload.combined.schema.validate = true")
+ val tableName = generateTableName
+ // Create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+
+ spark.sql(
+ s"""
+ |insert into $tableName values
+ | (1, 'a1', 10, 100),
+ | (2, 'a2', 20, 100),
+ | (3, 'a3', 30, 100),
+ | (4, 'a4', 40, 100),
+ | (5, 'a5', 50, 100),
+ | (6, 'a6', 60, 100)
+ |""".stripMargin)
+
+ // First merge with a extra input field 'flag' (insert a new record)
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select 2 as id, 'a1' as name, 50 as price, 999 as ts
+ | ) s0
+ | on $tableName.price < s0.price and $tableName.id >= s0.id
+ | when matched then update set ts = s0.ts
+ | when not matched then insert *
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 10.0, 100),
+ Seq(2, "a2", 20.0, 999),
+ Seq(3, "a3", 30.0, 999),
+ Seq(4, "a4", 40.0, 999),
+ Seq(5, "a5", 50.0, 100),
+ Seq(6, "a6", 60.0, 100)
+ )
+
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select 1 as id, 'a1' as name, 50 as price, 999 as ts
+ | ) s0
+ | on $tableName.id < s0.id
+ | when matched then update set ts = s0.ts
+ | when not matched then insert *
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 10.0, 100),
+ Seq(2, "a2", 20.0, 999),
+ Seq(3, "a3", 30.0, 999),
+ Seq(4, "a4", 40.0, 999),
+ Seq(5, "a5", 50.0, 100),
+ Seq(6, "a6", 60.0, 100),
+ Seq(1, "a1", 50.0, 999)
+ )
+
+ })
+ }
+
+ test("Test pkless multiple source match") {
+ for (withPrecombine <- Seq(true, false)) {
+ withRecordType()(withTempDir { tmp =>
+ spark.sql("set hoodie.payload.combined.schema.validate = true")
+ val tableName = generateTableName
+
+ val prekstr = if (withPrecombine) "tblproperties (preCombineField =
'ts')" else ""
+ // Create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | $prekstr
+ """.stripMargin)
+
+ spark.sql(
+ s"""
+ |insert into $tableName values
+ | (1, 'a1', 10, 100)
+ |""".stripMargin)
+
+ // First merge with a extra input field 'flag' (insert a new record)
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select 1 as id, 'a1' as name, 20 as price, 200 as ts
+ | union all
+ | select 2 as id, 'a1' as name, 30 as price, 100 as ts
+ | ) s0
+ | on $tableName.name = s0.name
+ | when matched then update set price = s0.price
+ | when not matched then insert *
+ """.stripMargin)
+ if (withPrecombine) {
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 20.0, 100)
+ )
+ } else {
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 30.0, 100)
+ )
+ }
+ })
+ }
+
+ }
+
+ test("Test MergeInto Basic pkless") {
+ withRecordType()(withTempDir { tmp =>
+ spark.sql("set hoodie.payload.combined.schema.validate = true")
+ val tableName = generateTableName
+ // Create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+
+ // First merge with a extra input field 'flag' (insert a new record)
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as
flag
+ | ) s0
+ | on s0.id = $tableName.id
+ | when matched and flag = '1' then update set
+ | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+ | when not matched and flag = '1' then insert *
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 10.0, 1000)
+ )
+
+ // Second merge (update the record)
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select 1 as id, 'a1' as name, 10 as price, 1001 as ts
+ | ) s0
+ | on s0.id = $tableName.id
+ | when matched then update set
+ | id = s0.id, name = s0.name, price = s0.price + $tableName.price,
ts = s0.ts
+ | when not matched then insert *
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 20.0, 1001)
+ )
+
+ // the third time merge (update & insert the record)
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select * from (
+ | select 1 as id, 'a1' as name, 10 as price, 1002 as ts
+ | union all
+ | select 2 as id, 'a2' as name, 12 as price, 1001 as ts
+ | )
+ | ) s0
+ | on s0.id = $tableName.id
+ | when matched then update set
+ | id = s0.id, name = s0.name, price = s0.price + $tableName.price,
ts = s0.ts
+ | when not matched and s0.id % 2 = 0 then insert *
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 30.0, 1002),
+ Seq(2, "a2", 12.0, 1001)
+ )
+
+ // the fourth merge (delete the record)
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select 1 as id, 'a1' as name, 12 as price, 1003 as ts
+ | ) s0
+ | on s0.id = $tableName.id
+ | when matched and s0.id != 1 then update set
+ | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+ | when matched and s0.id = 1 then delete
+ | when not matched then insert *
+ """.stripMargin)
+ val cnt = spark.sql(s"select * from $tableName where id = 1").count()
+ assertResult(0)(cnt)
+ })
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index c760bf3442b..d1c40728d54 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -29,7 +29,8 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable,
Join, LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark24HoodieParquetFileFormat}
@@ -186,4 +187,8 @@ class Spark2Adapter extends SparkAdapter {
case OFF_HEAP => "OFF_HEAP"
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel:
$level")
}
+
+ override def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType:
JoinType, condition: Option[Expression], hint: String): LogicalPlan = {
+ Join(left, right, joinType, condition)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
index 4401615e90b..1faceed10b4 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias,
CurrentDate, CurrentTim
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment,
DeleteAction, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction,
Window}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.toPrettySQL
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
/**
* NOTE: This code is borrowed from Spark 3.1.3
@@ -76,7 +77,7 @@ object HoodieSpark2Analysis {
mergeInto: MergeIntoTable,
resolveValuesWithSourceOnly: Boolean):
Seq[Assignment] = {
if (assignments.isEmpty) {
- val expandedColumns = mergeInto.targetTable.output
+ val expandedColumns =
HoodieSqlCommonUtils.removeMetaFields(mergeInto.targetTable.output)
val expandedValues = mergeInto.sourceTable.output
expandedColumns.zip(expandedValues).map(kv => Assignment(kv._1, kv._2))
} else {
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index d5a35980526..2e635e4677a 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -28,6 +28,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters,
HoodieSparkAvroSchemaConverters}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression,
InterpretedPredicate, Predicate}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint,
LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hudi.SparkAdapter
@@ -92,4 +94,8 @@ abstract class BaseSpark3Adapter extends SparkAdapter with
Logging {
}
override def convertStorageLevelToString(level: StorageLevel): String
+
+ override def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType:
JoinType, condition: Option[Expression], hint: String): LogicalPlan = {
+ Join(left, right, joinType, condition, JoinHint.NONE)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark30Analysis.scala
similarity index 88%
copy from
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
copy to
hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark30Analysis.scala
index 4401615e90b..96d95b4a903 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
+++
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark30Analysis.scala
@@ -18,28 +18,35 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases,
ResolveLambdaVariables, UnresolvedAttribute, UnresolvedExtractValue,
caseInsensitiveResolution, withPosition}
import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate,
CurrentTimestamp, Expression, ExtractValue, GetStructField, LambdaFunction}
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment,
DeleteAction, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction,
Window}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.toPrettySQL
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
/**
- * NOTE: This code is borrowed from Spark 3.1.3
- * This code is borrowed, so that we can have some advanced Spark SQL
functionality (like Merge Into, for ex)
- * in Spark 2.x
+ * NOTE: Taken from HoodieSpark2Analysis and modified to resolve source and
target tables if not already resolved
*
* PLEASE REFRAIN MAKING ANY CHANGES TO THIS CODE UNLESS ABSOLUTELY
NECESSARY
*/
-object HoodieSpark2Analysis {
+object HoodieSpark30Analysis {
case class ResolveReferences(spark: SparkSession) extends Rule[LogicalPlan] {
private val resolver = spark.sessionState.conf.resolver
override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsUp {
- case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
- if (!m.resolved || containsUnresolvedStarAssignments(m)) &&
targetTable.resolved && sourceTable.resolved =>
-
+ case mO @ MergeIntoTable(targetTableO, sourceTableO, _, _, _)
+ //// Hudi change: don't want to go to the spark mit resolution so we
resolve the source and target if they haven't been
+ //
+ if !mO.resolved || containsUnresolvedStarAssignments(mO) =>
+ lazy val analyzer = spark.sessionState.analyzer
+ val targetTable = if (targetTableO.resolved) targetTableO else
analyzer.execute(targetTableO)
+ val sourceTable = if (sourceTableO.resolved) sourceTableO else
analyzer.execute(sourceTableO)
+ val m = mO.copy(targetTable = targetTable, sourceTable = sourceTable)
+ //
+ ////
EliminateSubqueryAliases(targetTable) match {
case _ =>
val newMatchedActions = m.matchedActions.map {
@@ -76,7 +83,11 @@ object HoodieSpark2Analysis {
mergeInto: MergeIntoTable,
resolveValuesWithSourceOnly: Boolean):
Seq[Assignment] = {
if (assignments.isEmpty) {
- val expandedColumns = mergeInto.targetTable.output
+ ////Hudi change: filter out meta fields
+ //
+ val expandedColumns =
HoodieSqlCommonUtils.removeMetaFields(mergeInto.targetTable.output)
+ //
+ ////
val expandedValues = mergeInto.sourceTable.output
expandedColumns.zip(expandedValues).map(kv => Assignment(kv._1, kv._2))
} else {
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark31Analysis.scala
similarity index 88%
copy from
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
copy to
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark31Analysis.scala
index 4401615e90b..1dc57acbc68 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark31Analysis.scala
@@ -18,28 +18,35 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases,
ResolveLambdaVariables, UnresolvedAttribute, UnresolvedExtractValue,
caseInsensitiveResolution, withPosition}
import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate,
CurrentTimestamp, Expression, ExtractValue, GetStructField, LambdaFunction}
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment,
DeleteAction, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction,
Window}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.toPrettySQL
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
/**
- * NOTE: This code is borrowed from Spark 3.1.3
- * This code is borrowed, so that we can have some advanced Spark SQL
functionality (like Merge Into, for ex)
- * in Spark 2.x
+ * NOTE: Taken from HoodieSpark2Analysis and modified to resolve source and
target tables if not already resolved
*
* PLEASE REFRAIN MAKING ANY CHANGES TO THIS CODE UNLESS ABSOLUTELY
NECESSARY
*/
-object HoodieSpark2Analysis {
+object HoodieSpark31Analysis {
case class ResolveReferences(spark: SparkSession) extends Rule[LogicalPlan] {
private val resolver = spark.sessionState.conf.resolver
override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsUp {
- case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
- if (!m.resolved || containsUnresolvedStarAssignments(m)) &&
targetTable.resolved && sourceTable.resolved =>
-
+ case mO @ MergeIntoTable(targetTableO, sourceTableO, _, _, _)
+ //// Hudi change: don't want to go to the spark mit resolution so we
resolve the source and target if they haven't been
+ //
+ if !mO.resolved || containsUnresolvedStarAssignments(mO) =>
+ lazy val analyzer = spark.sessionState.analyzer
+ val targetTable = if (targetTableO.resolved) targetTableO else
analyzer.execute(targetTableO)
+ val sourceTable = if (sourceTableO.resolved) sourceTableO else
analyzer.execute(sourceTableO)
+ val m = mO.copy(targetTable = targetTable, sourceTable = sourceTable)
+ //
+ ////
EliminateSubqueryAliases(targetTable) match {
case _ =>
val newMatchedActions = m.matchedActions.map {
@@ -76,7 +83,11 @@ object HoodieSpark2Analysis {
mergeInto: MergeIntoTable,
resolveValuesWithSourceOnly: Boolean):
Seq[Assignment] = {
if (assignments.isEmpty) {
- val expandedColumns = mergeInto.targetTable.output
+ ////Hudi change: filter out meta fields
+ //
+ val expandedColumns =
HoodieSqlCommonUtils.removeMetaFields(mergeInto.targetTable.output)
+ //
+ ////
val expandedValues = mergeInto.sourceTable.output
expandedColumns.zip(expandedValues).map(kv => Assignment(kv._1, kv._2))
} else {
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
index f07d0ccdc63..91e24979717 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
@@ -22,9 +22,9 @@ import org.apache.hudi.Spark32HoodieFileScanRDD
import org.apache.spark.sql._
import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt,
EliminateSubqueryAliases}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable,
LogicalPlan}
@@ -123,4 +123,8 @@ class Spark3_2Adapter extends BaseSpark3Adapter {
case OFF_HEAP => "OFF_HEAP"
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel:
$level")
}
+
+ override def failAnalysisForMIT(a: Attribute, cols: String): Unit = {
+ a.failAnalysis(s"cannot resolve ${a.sql} in MERGE command given columns
[$cols]")
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
index fc4d5d18ff2..cd02ded580a 100644
---
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
+++
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
@@ -20,15 +20,18 @@ package org.apache.spark.sql.hudi.analysis
import org.apache.hadoop.fs.Path
import org.apache.hudi.{DataSourceReadOptions, DefaultSource,
SparkAdapterSupport}
import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.MatchResolvedTable
-import org.apache.spark.sql.catalyst.analysis.UnresolvedPartitionSpec
+import
org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolveExpressionByPlanChildren
+import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt,
EliminateSubqueryAliases, NamedRelation, UnresolvedAttribute,
UnresolvedPartitionSpec}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
-import org.apache.spark.sql.catalyst.plans.logcal.{HoodieQuery,
HoodieTableChanges, HoodieTableChangesByPath, HoodieTableChangesOptionsParser}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logcal.{HoodieQuery,
HoodieTableChanges, HoodieTableChangesOptionsParser}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
import org.apache.spark.sql.connector.catalog.{Table, V1Table}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import
org.apache.spark.sql.hudi.analysis.HoodieSpark32PlusAnalysis.{HoodieV1OrV2Table,
ResolvesToHudiTable}
import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table
@@ -128,9 +131,135 @@ case class HoodieSpark32PlusResolveReferences(spark:
SparkSession) extends Rule[
catalogTable.location.toString))
LogicalRelation(relation, catalogTable)
}
+ case mO@MatchMergeIntoTable(targetTableO, sourceTableO, _)
+ //// Hudi change: don't want to go to the spark mit resolution so we
resolve the source and target if they haven't been
+ //
+ if !mO.resolved =>
+ lazy val analyzer = spark.sessionState.analyzer
+ val targetTable = if (targetTableO.resolved) targetTableO else
analyzer.execute(targetTableO)
+ val sourceTable = if (sourceTableO.resolved) sourceTableO else
analyzer.execute(sourceTableO)
+ val m = mO.asInstanceOf[MergeIntoTable].copy(targetTable = targetTable,
sourceTable = sourceTable)
+ //
+ ////
+ EliminateSubqueryAliases(targetTable) match {
+ case r: NamedRelation if r.skipSchemaResolution =>
+ // Do not resolve the expression if the target table accepts any
schema.
+ // This allows data sources to customize their own resolution logic
using
+ // custom resolution rules.
+ m
+
+ case _ =>
+ val newMatchedActions = m.matchedActions.map {
+ case DeleteAction(deleteCondition) =>
+ val resolvedDeleteCondition = deleteCondition.map(
+ resolveExpressionByPlanChildren(_, m))
+ DeleteAction(resolvedDeleteCondition)
+ case UpdateAction(updateCondition, assignments) =>
+ val resolvedUpdateCondition = updateCondition.map(
+ resolveExpressionByPlanChildren(_, m))
+ UpdateAction(
+ resolvedUpdateCondition,
+ // The update value can access columns from both target and
source tables.
+ resolveAssignments(assignments, m, resolveValuesWithSourceOnly
= false))
+ case UpdateStarAction(updateCondition) =>
+ ////Hudi change: filter out meta fields
+ //
+ val assignments = targetTable.output.filter(a =>
!isMetaField(a.name)).map { attr =>
+ Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
+ }
+ //
+ ////
+ UpdateAction(
+ updateCondition.map(resolveExpressionByPlanChildren(_, m)),
+ // For UPDATE *, the value must from source table.
+ resolveAssignments(assignments, m, resolveValuesWithSourceOnly
= true))
+ case o => o
+ }
+ val newNotMatchedActions = m.notMatchedActions.map {
+ case InsertAction(insertCondition, assignments) =>
+ // The insert action is used when not matched, so its condition
and value can only
+ // access columns from the source table.
+ val resolvedInsertCondition = insertCondition.map(
+ resolveExpressionByPlanChildren(_, Project(Nil,
m.sourceTable)))
+ InsertAction(
+ resolvedInsertCondition,
+ resolveAssignments(assignments, m, resolveValuesWithSourceOnly
= true))
+ case InsertStarAction(insertCondition) =>
+ // The insert action is used when not matched, so its condition
and value can only
+ // access columns from the source table.
+ val resolvedInsertCondition = insertCondition.map(
+ resolveExpressionByPlanChildren(_, Project(Nil,
m.sourceTable)))
+ ////Hudi change: filter out meta fields
+ //
+ val assignments = targetTable.output.filter(a =>
!isMetaField(a.name)).map { attr =>
+ Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
+ }
+ //
+ ////
+ InsertAction(
+ resolvedInsertCondition,
+ resolveAssignments(assignments, m, resolveValuesWithSourceOnly
= true))
+ case o => o
+ }
+ val resolvedMergeCondition =
resolveExpressionByPlanChildren(m.mergeCondition, m)
+ m.copy(mergeCondition = resolvedMergeCondition,
+ matchedActions = newMatchedActions,
+ notMatchedActions = newNotMatchedActions)
+ }
+ }
+
+ def resolveAssignments(
+ assignments: Seq[Assignment],
+ mergeInto: MergeIntoTable,
+ resolveValuesWithSourceOnly: Boolean):
Seq[Assignment] = {
+ assignments.map { assign =>
+ val resolvedKey = assign.key match {
+ case c if !c.resolved =>
+ resolveMergeExprOrFail(c, Project(Nil, mergeInto.targetTable))
+ case o => o
+ }
+ val resolvedValue = assign.value match {
+ // The update values may contain target and/or source references.
+ case c if !c.resolved =>
+ if (resolveValuesWithSourceOnly) {
+ resolveMergeExprOrFail(c, Project(Nil, mergeInto.sourceTable))
+ } else {
+ resolveMergeExprOrFail(c, mergeInto)
+ }
+ case o => o
+ }
+ Assignment(resolvedKey, resolvedValue)
+ }
+ }
+
+ private def resolveMergeExprOrFail(e: Expression, p: LogicalPlan):
Expression = {
+ try {
+ val resolved = resolveExpressionByPlanChildren(e, p)
+ resolved.references.filter(!_.resolved).foreach { a =>
+ // Note: This will throw error only on unresolved attribute issues,
+ // not other resolution errors like mismatched data types.
+ val cols = p.inputSet.toSeq.map(_.sql).mkString(", ")
+ //// change from spark because spark 3.4 constructor is different for
fail analysis
+ //
+ sparkAdapter.failAnalysisForMIT(a, cols)
+ //
+ ////
+ }
+ resolved
+ } catch {
+ case x: AnalysisException => throw x
+ }
+ }
+
+ private[sql] object MatchMergeIntoTable {
+ def unapply(plan: LogicalPlan): Option[(LogicalPlan, LogicalPlan,
Expression)] =
+ sparkAdapter.getCatalystPlanUtils.unapplyMergeIntoTable(plan)
}
+
}
+
+
/**
* Rule replacing resolved Spark's commands (not working for Hudi tables
out-of-the-box) with
* corresponding Hudi implementations
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
index 651027f932d..e481c9302f9 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
@@ -22,9 +22,9 @@ import org.apache.hudi.Spark33HoodieFileScanRDD
import org.apache.spark.sql._
import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt,
EliminateSubqueryAliases}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
@@ -124,4 +124,8 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
case OFF_HEAP => "OFF_HEAP"
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel:
$level")
}
+
+ override def failAnalysisForMIT(a: Attribute, cols: String): Unit = {
+ a.failAnalysis(s"cannot resolve ${a.sql} in MERGE command given columns
[$cols]")
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
index 3bd8256c3aa..cfeacabce42 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
@@ -21,9 +21,9 @@ import org.apache.avro.Schema
import org.apache.hudi.Spark34HoodieFileScanRDD
import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt,
EliminateSubqueryAliases}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
@@ -124,4 +124,12 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
case OFF_HEAP => "OFF_HEAP"
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel:
$level")
}
+
+ override def failAnalysisForMIT(a: Attribute, cols: String): Unit = {
+ a.failAnalysis(
+ errorClass = "_LEGACY_ERROR_TEMP_2309",
+ messageParameters = Map(
+ "sqlExpr" -> a.sql,
+ "cols" -> cols))
+ }
}