leesf commented on a change in pull request #3230:
URL: https://github.com/apache/hudi/pull/3230#discussion_r669751094



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
##########
@@ -126,48 +140,62 @@ class ExpressionPayload(record: GenericRecord,
     }
   }
 
+  /**
+   * Process the not-matched record. Test if the record matched any of 
insert-conditions,
+   * if matched then return the result of insert-assignment. Or else return a
+   * {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by 
HoodieWriteHandle.
+   *
+   * @param inputRecord The input record to process.
+   * @param properties  The properties.
+   * @return The result of the record to insert.
+   */
+  private def processNotMatchedRecord(inputRecord: SqlTypedRecord, properties: 
Properties): HOption[IndexedRecord] = {
+    val insertConditionAndAssignmentsText =
+      
properties.get(ExpressionPayload.PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS)
+    // Get the evaluator for each condition and insert assignment.
+    initWriteSchemaIfNeed(properties)
+    val insertConditionAndAssignments =
+      
ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText.toString, 
writeSchema)
+    var resultRecordOpt: HOption[IndexedRecord] = null
+    for ((conditionEvaluator, assignmentEvaluator) <- 
insertConditionAndAssignments
+         if resultRecordOpt == null) {
+      val conditionVal = evaluate(conditionEvaluator, 
inputRecord).head.asInstanceOf[Boolean]
+      // If matched the insert condition then execute the assignment 
expressions to compute the
+      // result record. We will return the first matched record.
+      if (conditionVal) {
+        val results = evaluate(assignmentEvaluator, inputRecord)
+        resultRecordOpt = HOption.of(convertToRecord(results, writeSchema))
+      }
+    }
+    if (resultRecordOpt != null) {
+      resultRecordOpt
+    } else {
+      // If there is no condition matched, just filter this record.
+      // Here we return a IGNORE_RECORD, HoodieCreateHandle will not handle it.
+      HOption.of(HoodieWriteHandle.IGNORE_RECORD)
+    }
+  }
+
   override def getInsertValue(schema: Schema, properties: Properties): 
HOption[IndexedRecord] = {
     val incomingRecord = bytesToAvro(recordBytes, schema)
     if (isDeleteRecord(incomingRecord)) {
       HOption.empty[IndexedRecord]()
     } else {
-      val insertConditionAndAssignmentsText =
-        
properties.get(ExpressionPayload.PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS)
-      // Process insert
       val sqlTypedRecord = new SqlTypedRecord(incomingRecord)
-      // Get the evaluator for each condition and insert assignment.
-      initWriteSchemaIfNeed(properties)
-      val insertConditionAndAssignments =
-        
ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText.toString, 
writeSchema)
-      var resultRecordOpt: HOption[IndexedRecord] = null
-      for ((conditionEvaluator, assignmentEvaluator) <- 
insertConditionAndAssignments
-           if resultRecordOpt == null) {
-        val conditionVal = evaluate(conditionEvaluator, 
sqlTypedRecord).head.asInstanceOf[Boolean]
-        // If matched the insert condition then execute the assignment 
expressions to compute the
-        // result record. We will return the first matched record.
-        if (conditionVal) {
-          val results = evaluate(assignmentEvaluator, sqlTypedRecord)
-          resultRecordOpt = HOption.of(convertToRecord(results, writeSchema))
-        }
-      }
-
-      // Process delete for MOR
-      if (resultRecordOpt == null && isMORTable(properties)) {
-        val deleteConditionText = 
properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION)
-        if (deleteConditionText != null) {
-          val deleteCondition = getEvaluator(deleteConditionText.toString, 
writeSchema).head._1
-          val deleteConditionVal = evaluate(deleteCondition, 
sqlTypedRecord).head.asInstanceOf[Boolean]
-          if (deleteConditionVal) {
-            resultRecordOpt = HOption.empty()
-          }
+      if (isMORTable(properties)) {

Review comment:
       no need to handle `resultRecordOpt = null` any longer?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to