leesf commented on a change in pull request #3230:
URL: https://github.com/apache/hudi/pull/3230#discussion_r669751094
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
##########
@@ -126,48 +140,62 @@ class ExpressionPayload(record: GenericRecord,
}
}
+ /**
+ * Process the not-matched record. Test if the record matched any of
insert-conditions,
+ * if matched then return the result of insert-assignment. Or else return a
+ * {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by
HoodieWriteHandle.
+ *
+ * @param inputRecord The input record to process.
+ * @param properties The properties.
+ * @return The result of the record to insert.
+ */
+ private def processNotMatchedRecord(inputRecord: SqlTypedRecord, properties:
Properties): HOption[IndexedRecord] = {
+ val insertConditionAndAssignmentsText =
+
properties.get(ExpressionPayload.PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS)
+ // Get the evaluator for each condition and insert assignment.
+ initWriteSchemaIfNeed(properties)
+ val insertConditionAndAssignments =
+
ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText.toString,
writeSchema)
+ var resultRecordOpt: HOption[IndexedRecord] = null
+ for ((conditionEvaluator, assignmentEvaluator) <-
insertConditionAndAssignments
+ if resultRecordOpt == null) {
+ val conditionVal = evaluate(conditionEvaluator,
inputRecord).head.asInstanceOf[Boolean]
+ // If matched the insert condition then execute the assignment
expressions to compute the
+ // result record. We will return the first matched record.
+ if (conditionVal) {
+ val results = evaluate(assignmentEvaluator, inputRecord)
+ resultRecordOpt = HOption.of(convertToRecord(results, writeSchema))
+ }
+ }
+ if (resultRecordOpt != null) {
+ resultRecordOpt
+ } else {
+ // If there is no condition matched, just filter this record.
+ // Here we return a IGNORE_RECORD, HoodieCreateHandle will not handle it.
+ HOption.of(HoodieWriteHandle.IGNORE_RECORD)
+ }
+ }
+
override def getInsertValue(schema: Schema, properties: Properties):
HOption[IndexedRecord] = {
val incomingRecord = bytesToAvro(recordBytes, schema)
if (isDeleteRecord(incomingRecord)) {
HOption.empty[IndexedRecord]()
} else {
- val insertConditionAndAssignmentsText =
-
properties.get(ExpressionPayload.PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS)
- // Process insert
val sqlTypedRecord = new SqlTypedRecord(incomingRecord)
- // Get the evaluator for each condition and insert assignment.
- initWriteSchemaIfNeed(properties)
- val insertConditionAndAssignments =
-
ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText.toString,
writeSchema)
- var resultRecordOpt: HOption[IndexedRecord] = null
- for ((conditionEvaluator, assignmentEvaluator) <-
insertConditionAndAssignments
- if resultRecordOpt == null) {
- val conditionVal = evaluate(conditionEvaluator,
sqlTypedRecord).head.asInstanceOf[Boolean]
- // If matched the insert condition then execute the assignment
expressions to compute the
- // result record. We will return the first matched record.
- if (conditionVal) {
- val results = evaluate(assignmentEvaluator, sqlTypedRecord)
- resultRecordOpt = HOption.of(convertToRecord(results, writeSchema))
- }
- }
-
- // Process delete for MOR
- if (resultRecordOpt == null && isMORTable(properties)) {
- val deleteConditionText =
properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION)
- if (deleteConditionText != null) {
- val deleteCondition = getEvaluator(deleteConditionText.toString,
writeSchema).head._1
- val deleteConditionVal = evaluate(deleteCondition,
sqlTypedRecord).head.asInstanceOf[Boolean]
- if (deleteConditionVal) {
- resultRecordOpt = HOption.empty()
- }
+ if (isMORTable(properties)) {
Review comment:
no need to handle `resultRecordOpt = null` any longer?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]