yihua commented on code in PR #13444:
URL: https://github.com/apache/hudi/pull/13444#discussion_r2217078762
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala:
##########
@@ -57,7 +57,8 @@ abstract class HoodieCDCTestBase extends
HoodieSparkClientTestBase {
PRECOMBINE_FIELD.key -> "timestamp",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1",
- HoodieCleanConfig.AUTO_CLEAN.key -> "false"
+ HoodieCleanConfig.AUTO_CLEAN.key -> "false",
+ HoodieWriteConfig.MARKERS_TIMELINE_SERVER_BASED_BATCH_INTERVAL_MS.key ->
"10"
Review Comment:
Is the test flaky without this config?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import java.util.function.UnaryOperator;
+
+/**
+ * Interface used within the {@link HoodieFileGroupReader<T>} for processing
updates to records in Merge-on-Read tables.
+ * Note that the updates are always relative to the base file's current state.
+ * @param <T> the engine specific record type
+ */
+public interface UpdateProcessor<T> {
+ /**
+ * Processes the update to the record. If the update should not be returned
to the caller, the method should return null.
+ * @param recordKey the key of the record being updated
+ * @param previousRecord the previous version of the record, or null if
there is no previous value
+ * @param currentRecord the current version of the record after merging with
the existing record, if any exists
+ * @param isDelete a flag indicating whether the merge resulted in a delete
operation
+ * @return the processed record, or null if the record should not be
returned to the caller
+ */
+ T processUpdate(String recordKey, T previousRecord, T currentRecord, boolean
isDelete);
Review Comment:
Should this be named as `mergedRecord` to avoid confusion? Similar for
classes that implement this interface.
```suggestion
T processUpdate(String recordKey, T previousRecord, T mergedRecord,
boolean isDelete);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]