yihua commented on code in PR #9083:
URL: https://github.com/apache/hudi/pull/9083#discussion_r1251290919


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieInternalProxyIndex.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+public class HoodieInternalProxyIndex extends HoodieIndex<Object, Object> {
+
+  /**
+   * Index that does not do tagging. Its purpose is to be used for Spark sql 
Merge into command
+   * Merge into does not need to use index lookup because we get the location 
from the meta columns
+   * from the join
+   */

Review Comment:
   nit: could you also add clarification in the docs on why we still need to 
implement a dummy index instead of using prepped write action?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoKeyGenerator.scala:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieRecord.{RECORD_KEY_META_FIELD_ORD, 
PARTITION_PATH_META_FIELD_ORD}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * NOTE TO USERS: YOU SHOULD NOT SET THIS AS YOUR KEYGENERATOR
+ *
+ * Keygenerator that is meant to be used internally for the spark sql merge 
into command
+ * It will attempt to get the partition path and recordkey from the 
metafields, but will
+ * fallback to the sql keygenerator if the meta field is not populated
+ *
+ */
+class MergeIntoKeyGenerator(props: TypedProperties) extends 
SqlKeyGenerator(props) {
+
+  override def getRecordKey(record: GenericRecord): String = {
+    val recordKey = record.get(RECORD_KEY_META_FIELD_ORD)
+    if (recordKey != null) {
+      recordKey.toString
+    } else {
+      super.getRecordKey(record)
+    }
+  }
+
+  override def getRecordKey(row: Row): String = {
+    val recordKey = row.get(RECORD_KEY_META_FIELD_ORD)
+    if (recordKey != null) {
+      recordKey.toString
+    } else {
+      super.getRecordKey(row)
+    }
+  }
+
+  override def getRecordKey(internalRow: InternalRow, schema: StructType): 
UTF8String = {
+    val recordKey = internalRow.getUTF8String(RECORD_KEY_META_FIELD_ORD)
+    if (recordKey != null) {
+      recordKey
+    } else {
+      super.getRecordKey(internalRow, schema)
+    }
+  }
+
+  override def getPartitionPath(record: GenericRecord): String = {
+    val partitionPath = record.get(PARTITION_PATH_META_FIELD_ORD)
+    if (partitionPath != null) {
+      partitionPath.toString
+    } else {
+      super.getPartitionPath(record)
+    }
+  }
+
+  override def getPartitionPath(row: Row): String = {
+    val partitionPath = row.get(PARTITION_PATH_META_FIELD_ORD)
+    if (partitionPath != null) {
+      partitionPath.toString

Review Comment:
   nit: similar here for `getString()`



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -43,23 +43,29 @@ object HoodieAnalysis extends SparkAdapterSupport {
     val rules: ListBuffer[RuleBuilder] = ListBuffer()
 
     // NOTE: This rule adjusts [[LogicalRelation]]s resolving into Hudi tables 
such that
-    //       meta-fields are not affecting the resolution of the target 
columns to be updated by Spark.
+    //       meta-fields are not affecting the resolution of the target 
columns to be updated by Spark (Except in the
+    //       case of MergeInto. We leave the meta columns on the target table, 
and use other means to ensure resolution)

Review Comment:
   Minor: my understanding is that for pkless table, we need record key, 
partition path, and filename from the meta columns.  Other meta columns can be 
pruned.  I assume if we only keep the three, Spark does not read the others, 
which is a small improvement we can do.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java:
##########
@@ -43,6 +44,10 @@
  */
 public final class SparkHoodieIndexFactory {
   public static HoodieIndex createIndex(HoodieWriteConfig config) {
+    boolean mergeIntoWrites = 
config.getProps().getBoolean(HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key(), 
false);

Review Comment:
   nit: `HoodieInternalConfig.SQL_MERGE_INTO_WRITES.defaultValue` instead of 
`false`



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoKeyGenerator.scala:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieRecord.{RECORD_KEY_META_FIELD_ORD, 
PARTITION_PATH_META_FIELD_ORD}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * NOTE TO USERS: YOU SHOULD NOT SET THIS AS YOUR KEYGENERATOR
+ *
+ * Keygenerator that is meant to be used internally for the spark sql merge 
into command
+ * It will attempt to get the partition path and recordkey from the 
metafields, but will
+ * fallback to the sql keygenerator if the meta field is not populated
+ *
+ */
+class MergeIntoKeyGenerator(props: TypedProperties) extends 
SqlKeyGenerator(props) {
+
+  override def getRecordKey(record: GenericRecord): String = {
+    val recordKey = record.get(RECORD_KEY_META_FIELD_ORD)
+    if (recordKey != null) {
+      recordKey.toString
+    } else {
+      super.getRecordKey(record)
+    }
+  }
+
+  override def getRecordKey(row: Row): String = {
+    val recordKey = row.get(RECORD_KEY_META_FIELD_ORD)
+    if (recordKey != null) {
+      recordKey.toString

Review Comment:
   nit: can be simplified as `row.getString()`



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala:
##########
@@ -206,27 +222,29 @@ object HoodieCreateRecordUtils {
   }
 
   def getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator: 
Option[BaseKeyGenerator], avroRec: GenericRecord,
-                                                 isPrepped: Boolean): 
(HoodieKey, Option[HoodieRecordLocation]) = {
+                                                 isPrepped: Boolean, 
mergeIntoWrites: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
+    //use keygen for mergeIntoWrites recordKey and partitionPath because the 
keygenerator handles
+    //fetching from the meta fields if they are populated and otherwise doing 
keygen
     val recordKey = if (isPrepped) {
       avroRec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
     } else {
       keyGenerator.get.getRecordKey(avroRec)
-    };
+    }
 
     val partitionPath = if (isPrepped) {
       avroRec.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString
     } else {
       keyGenerator.get.getPartitionPath(avroRec)
-    };
+    }
 
     val hoodieKey = new HoodieKey(recordKey, partitionPath)
-    val instantTime: Option[String] = if (isPrepped) {
+    val instantTime: Option[String] = if (isPrepped || mergeIntoWrites) {

Review Comment:
   Is the commit time pre-populated based on the current commit (not the commit 
time from the meta columns in the existing files) for MIT records?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoKeyGenerator.scala:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieRecord.{RECORD_KEY_META_FIELD_ORD, 
PARTITION_PATH_META_FIELD_ORD}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * NOTE TO USERS: YOU SHOULD NOT SET THIS AS YOUR KEYGENERATOR

Review Comment:
   Should we add validation in a follow-up PR on the key generator so that 
`SqlKeyGenerator` and `MergeIntoKeyGenerator` should be be set by the user?



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -125,18 +127,68 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
    * expression involving [[source]] column(s), we will have to add "phony" 
column matching the
    * primary-key one of the target table.
    */
-  private lazy val primaryKeyAttributeToConditionExpression: Seq[(Attribute, 
Expression)] = {
+  private lazy val recordKeyAttributeToConditionExpression: Seq[(Attribute, 
Expression)] = {
+    val primaryKeyFields = hoodieCatalogTable.tableConfig.getRecordKeyFields
     val conditions = splitConjunctivePredicates(mergeInto.mergeCondition)
-    if (!conditions.forall(p => p.isInstanceOf[EqualTo])) {
-      throw new AnalysisException(s"Currently only equality predicates are 
supported in MERGE INTO statement " +
-        s"(provided ${mergeInto.mergeCondition.sql}")
+    if (primaryKeyFields.isPresent) {
+      //pkless tables can have more complex conditions
+      if (!conditions.forall(p => p.isInstanceOf[EqualTo])) {
+        throw new AnalysisException(s"Currently only equality predicates are 
supported in MERGE INTO statement on primary key table" +
+          s"(provided ${mergeInto.mergeCondition.sql}")
+      }
     }
-
     val resolver = sparkSession.sessionState.analyzer.resolver
-    val primaryKeyField = hoodieCatalogTable.tableConfig.getRecordKeyFieldProp
+    val partitionPathFields = hoodieCatalogTable.tableConfig.getPartitionFields
+    //ensure all primary key fields are part of the merge condition
+    //allow partition path to be part of the merge condition but not required
+    val targetAttr2ConditionExpressions = doCasting(conditions, 
primaryKeyFields.isPresent)
+    val expressionSet = scala.collection.mutable.Set[(Attribute, 
Expression)](targetAttr2ConditionExpressions:_*)
+    var recordkeyFields: Seq[(String,String)] = Seq.empty

Review Comment:
   nit: `partitionAndKeyFields`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to