This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 65cb3874d29 [HUDI-6702][RFC-46] Support customized logic (#9809)
65cb3874d29 is described below

commit 65cb3874d2976e057b724585175f876ab86df239
Author: Lin Liu <[email protected]>
AuthorDate: Thu Oct 5 07:09:01 2023 -0700

    [HUDI-6702][RFC-46] Support customized logic (#9809)
    
    For Spark workflow, current HoodieRecordMerger interface does not support
    custom delete logic like the getInsertValue method does in 
HoodieRecordPayload interface.
    
    This change fills the hole for the merger interface.
---
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  11 +-
 .../hudi/testutils/SparkDatasetTestUtils.java      |  59 +++-
 .../hudi/common/model/HoodieRecordMerger.java      |  16 +
 .../hudi/TestHoodieMergeHandleWithSparkMerger.java | 345 +++++++++++++++++++++
 4 files changed, 426 insertions(+), 5 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 21c0059474e..79c80e1a5af 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -310,8 +310,15 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
     }
     try {
       if (combineRecord.isPresent() && !combineRecord.get().isDelete(schema, 
config.getProps()) && !isDelete) {
-        writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop, 
preserveMetadata && useWriterSchemaForCompaction);
-        recordsWritten++;
+        // Last-minute check.
+        boolean decision = recordMerger.shouldFlush(combineRecord.get(), 
schema, config.getProps());
+
+        if (decision) { // CASE (1): Flush the merged record.
+          writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop, 
preserveMetadata && useWriterSchemaForCompaction);
+          recordsWritten++;
+        } else {  // CASE (2): A delete operation.
+          recordsDeleted++;
+        }
       } else {
         recordsDeleted++;
         // Clear the new location as the record was deleted
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
index 09e6bd699bc..1e07378f9af 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.testutils;
 
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -112,7 +113,30 @@ public class SparkDatasetTestUtils {
   public static Dataset<Row> getRandomRows(SQLContext sqlContext, int count, 
String partitionPath, boolean isError) {
     List<Row> records = new ArrayList<>();
     for (long recordNum = 0; recordNum < count; recordNum++) {
-      records.add(getRandomValue(partitionPath, isError));
+      records.add(getRandomValue(partitionPath, isError, ""));
+    }
+    return sqlContext.createDataFrame(records, isError ? ERROR_STRUCT_TYPE : 
STRUCT_TYPE);
+  }
+
+  public static Dataset<Row> getRandomRowsWithCommitTime(SQLContext sqlContext,
+                                                         int count,
+                                                         String partitionPath,
+                                                         boolean isError,
+                                                         String commitTime) {
+    List<Row> records = new ArrayList<>();
+    for (long recordNum = 0; recordNum < count; recordNum++) {
+      records.add(getRandomValue(partitionPath, isError, commitTime));
+    }
+    return sqlContext.createDataFrame(records, isError ? ERROR_STRUCT_TYPE : 
STRUCT_TYPE);
+  }
+
+  public static Dataset<Row> getRandomRowsWithKeys(SQLContext sqlContext,
+                                                   List<HoodieKey> keys,
+                                                   boolean isError,
+                                                   String commitTime) {
+    List<Row> records = new ArrayList<>();
+    for (HoodieKey key : keys) {
+      records.add(getRandomValue(key, isError, commitTime));
     }
     return sqlContext.createDataFrame(records, isError ? ERROR_STRUCT_TYPE : 
STRUCT_TYPE);
   }
@@ -123,10 +147,10 @@ public class SparkDatasetTestUtils {
    * @param partitionPath partition path to be set in the Row.
    * @return the Row thus generated.
    */
-  public static Row getRandomValue(String partitionPath, boolean isError) {
+  public static Row getRandomValue(String partitionPath, boolean isError, 
String commitTime) {
     // order commit time, seq no, record key, partition path, file name
     Object[] values = new Object[9];
-    values[0] = ""; //commit time
+    values[0] = commitTime; //commit time
     if (!isError) {
       values[1] = ""; // commit seq no
     } else {
@@ -146,6 +170,35 @@ public class SparkDatasetTestUtils {
     return new GenericRow(values);
   }
 
+  /**
+   * Generate random Row with a given key.
+   *
+   * @param key a {@link HoodieKey} key.
+   * @return the Row thus generated.
+   */
+  public static Row getRandomValue(HoodieKey key, boolean isError, String 
commitTime) {
+    // order commit time, seq no, record key, partition path, file name
+    Object[] values = new Object[9];
+    values[0] = commitTime; //commit time
+    if (!isError) {
+      values[1] = ""; // commit seq no
+    } else {
+      values[1] = RANDOM.nextLong();
+    }
+    values[2] = key.getRecordKey();
+    values[3] = key.getPartitionPath();
+    values[4] = ""; // filename
+    values[5] = UUID.randomUUID().toString();
+    values[6] = key.getPartitionPath();
+    values[7] = RANDOM.nextInt();
+    if (!isError) {
+      values[8] = RANDOM.nextLong();
+    } else {
+      values[8] = UUID.randomUUID().toString();
+    }
+    return new GenericRow(values);
+  }
+
   /**
    * Convert Dataset<Row>s to List of {@link InternalRow}s.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
index da413592abc..66a2ad77f2e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
@@ -46,6 +46,22 @@ public interface HoodieRecordMerger extends Serializable {
    */
   Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException;
 
+
+  /**
+   * In some cases a business logic does some checks before flushing a merged 
record to the disk.
+   * This method does the check, and when false is returned, it means the 
merged record should not
+   * be flushed.
+   *
+   * @param record the merged record.
+   * @param schema the schema of the merged record.
+   * @return a boolean variable to indicate if the merged record should be 
returned or not.
+   *
+   * <p> This interface is experimental and might be evolved in the future.
+   **/
+  default boolean shouldFlush(HoodieRecord record, Schema schema, 
TypedProperties props) throws IOException {
+    return true;
+  }
+
   /**
    * The record type handled by the current merger.
    * SPARK, AVRO, FLINK
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
new file mode 100644
index 00000000000..36f0f3ca351
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
@@ -0,0 +1,345 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.hudi;
+
+import org.apache.avro.Schema;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.testutils.SparkDatasetTestUtils;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieMergeHandleWithSparkMerger extends 
SparkClientFunctionalTestHarness {
+  private static final Schema SCHEMA = getAvroSchema("AvroSchema", 
"AvroSchemaNS");
+  private HoodieTableMetaClient metaClient;
+
+  public static String getPartitionPath() {
+    return "2023-10-01";
+  }
+
+  @BeforeEach
+  public void setUp() throws IOException {
+    Properties properties = new Properties();
+    properties.setProperty(
+        HoodieTableConfig.BASE_FILE_FORMAT.key(),
+        HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
+    properties.setProperty(
+        "hoodie.payload.ordering.field",
+        "_hoodie_record_key");
+    metaClient = getHoodieMetaClient(hadoopConf(), basePath(), 
HoodieTableType.MERGE_ON_READ, properties);
+  }
+
+  @Test
+  public void testDefaultMerger() throws Exception {
+    HoodieWriteConfig writeConfig = buildDefaultWriteConfig(SCHEMA);
+    HoodieRecordMerger merger = writeConfig.getRecordMerger();
+    assertTrue(merger instanceof DefaultMerger);
+    insertAndUpdate(writeConfig, 114);
+  }
+
+  @Test
+  public void testNoFlushMerger() throws Exception {
+    HoodieWriteConfig writeConfig = buildNoFlushWriteConfig(SCHEMA);
+    HoodieRecordMerger merger = writeConfig.getRecordMerger();
+    assertTrue(merger instanceof NoFlushMerger);
+    insertAndUpdate(writeConfig, 64);
+  }
+
+  @Test
+  public void testCustomMerger() throws Exception {
+    HoodieWriteConfig writeConfig = buildCustomWriteConfig(SCHEMA);
+    HoodieRecordMerger merger = writeConfig.getRecordMerger();
+    assertTrue(merger instanceof CustomMerger);
+    insertAndUpdate(writeConfig, 95);
+  }
+
+  public List<HoodieRecord> generateRecords(int numOfRecords, String 
commitTime) throws Exception {
+    Dataset<Row> rows = SparkDatasetTestUtils.getRandomRowsWithCommitTime(
+        new SQLContext(jsc()), numOfRecords, getPartitionPath(), false, 
commitTime);
+    List<InternalRow> internalRows = 
SparkDatasetTestUtils.toInternalRows(rows, SparkDatasetTestUtils.ENCODER);
+    return internalRows.stream()
+        .map(r -> new HoodieSparkRecord(new HoodieKey(r.getString(2), 
r.getString(3)),
+            r,
+            SparkDatasetTestUtils.STRUCT_TYPE,
+            false)).collect(Collectors.toList());
+  }
+
+  public List<HoodieRecord> generateRecordUpdates(List<HoodieKey> keys, String 
commitTime) throws Exception {
+    Dataset<Row> rows = SparkDatasetTestUtils.getRandomRowsWithKeys(
+        new SQLContext(jsc()), keys, false, commitTime);
+    List<InternalRow> internalRows = 
SparkDatasetTestUtils.toInternalRows(rows, SparkDatasetTestUtils.ENCODER);
+    return internalRows.stream()
+        .map(r -> new HoodieSparkRecord(new HoodieKey(r.getString(2), 
r.getString(3)),
+            r,
+            SparkDatasetTestUtils.STRUCT_TYPE,
+            false)).collect(Collectors.toList());
+  }
+
+  public List<HoodieRecord> generateEmptyRecords(List<HoodieKey> keys) {
+    List<HoodieRecord> records = new ArrayList<>();
+    for (HoodieKey key : keys) {
+      records.add(new HoodieEmptyRecord(key, HoodieOperation.DELETE, 1, 
HoodieRecord.HoodieRecordType.SPARK));
+    }
+    return records;
+  }
+
+  public static List<HoodieKey> getKeys(List<HoodieRecord> records) {
+    return records.stream().map(r -> r.getKey()).collect(Collectors.toList());
+  }
+
+  private static Schema getAvroSchema(String schemaName, String 
schemaNameSpace) {
+    return 
AvroConversionUtils.convertStructTypeToAvroSchema(SparkDatasetTestUtils.STRUCT_TYPE,
 schemaName, schemaNameSpace);
+  }
+
+  public HoodieWriteConfig getWriteConfig(Schema avroSchema) {
+    Properties extraProperties = new Properties();
+    extraProperties.setProperty(
+        "hoodie.datasource.write.record.merger.impls",
+        "org.apache.hudi.HoodieSparkRecordMerger");
+    extraProperties.setProperty(
+        "hoodie.logfile.data.block.format",
+        "parquet");
+    extraProperties.setProperty(
+        "hoodie.payload.ordering.field",
+        "_hoodie_record_key");
+
+    return getConfigBuilder(true)
+        .withPath(basePath())
+        .withSchema(avroSchema.toString())
+        .withProperties(extraProperties)
+        .build();
+  }
+
+  public DefaultWriteConfig buildDefaultWriteConfig(Schema avroSchema) {
+    HoodieWriteConfig config = getWriteConfig(avroSchema);
+    return new DefaultWriteConfig(config);
+  }
+
+  public NoFlushWriteConfig buildNoFlushWriteConfig(Schema avroSchema) {
+    HoodieWriteConfig config = getWriteConfig(avroSchema);
+    return new NoFlushWriteConfig(config);
+  }
+
+  public CustomWriteConfig buildCustomWriteConfig(Schema avroSchema) {
+    HoodieWriteConfig config = getWriteConfig(avroSchema);
+    return new CustomWriteConfig(config);
+  }
+
+  public HoodieTableFileSystemView getFileSystemView() {
+    return new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline());
+  }
+
+  public List<FileSlice> getLatestFileSlices(String partitionPath) {
+    return 
getFileSystemView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
+  }
+
+  public Option<FileSlice> getLatestFileSlice(String partitionPath, String 
fileId) {
+    return getFileSystemView().getLatestFileSlice(partitionPath, fileId);
+  }
+
+  public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String 
fileId) {
+    return getLatestFileSlice(partitionPath, fileId).map(fs -> 
fs.getBaseFile().get());
+  }
+
+  public List<HoodieLogFile> getLatestLogFiles(String partitionPath, String 
fileId) {
+    Option<FileSlice> fileSliceOpt = getLatestFileSlice(partitionPath, fileId);
+    if (fileSliceOpt.isPresent()) {
+      return fileSliceOpt.get().getLogFiles().collect(Collectors.toList());
+    }
+    return Collections.emptyList();
+  }
+
+  public List<String> getFileIds(String partitionPath) {
+    List<FileSlice> fileSlices = getLatestFileSlices(partitionPath);
+    return fileSlices.stream().map(fs -> 
fs.getFileId()).collect(Collectors.toList());
+  }
+
+  public void checkDataEquality(int numRecords) {
+    List<Row> rows = spark()
+        .read()
+        .format("org.apache.hudi")
+        .load(basePath() + "/" + getPartitionPath()).collectAsList();
+    assertEquals(numRecords, rows.size());
+  }
+
+  public void insertAndUpdate(HoodieWriteConfig writeConfig, int 
expectedRecordNum) throws Exception {
+    // Check if the table type is correct.
+    HoodieTableMetaClient reloadedMetaClient = 
HoodieTableMetaClient.reload(metaClient);
+    HoodieTable hoodieTable = HoodieSparkTable.create(writeConfig, context(), 
reloadedMetaClient);
+    assertEquals(hoodieTable.getMetaClient().getTableType(), 
HoodieTableType.MERGE_ON_READ);
+
+    // Write and read.
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
+
+      // (1) Write: insert.
+      String instantTime = "001";
+      writeClient.startCommitWithTime(instantTime);
+      List<HoodieRecord> records = generateRecords(100, instantTime);
+      Stream<HoodieBaseFile> baseFileStream = 
insertRecordsToMORTable(reloadedMetaClient, records, writeClient, writeConfig, 
instantTime);
+      assertTrue(baseFileStream.findAny().isPresent());
+
+      // Check metadata files.
+      Option<HoodieInstant> deltaCommit = 
reloadedMetaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
+      assertTrue(deltaCommit.isPresent());
+      assertEquals(instantTime, deltaCommit.get().getTimestamp(), "Delta 
commit should be specified value");
+
+      // Check data files.
+      List<String> fileIds = getFileIds(getPartitionPath());
+      assertEquals(1, fileIds.size());
+
+      Option<HoodieBaseFile> baseFileOption = 
getLatestBaseFile(getPartitionPath(), fileIds.get(0));
+      assertTrue(baseFileOption.isPresent());
+
+      List<HoodieLogFile> logFiles = getLatestLogFiles(getPartitionPath(), 
fileIds.get(0));
+      assertTrue(logFiles.isEmpty());
+      checkDataEquality(100);
+
+      // (2) Write: append.
+      instantTime = "002";
+      writeClient.startCommitWithTime(instantTime);
+
+      List<HoodieRecord> records2 = 
generateEmptyRecords(getKeys(records).subList(0, 17)); // 17 records with old 
keys.
+      List<HoodieRecord> records3 = 
generateRecordUpdates(getKeys(records).subList(17, 36), "001"); // 19 update 
records.
+      List<HoodieRecord> records4 = generateRecords(31, instantTime); // 31 
new records.
+      records2.addAll(records3);
+      records2.addAll(records4);
+      assertEquals(67, records2.size());
+      updateRecordsInMORTable(reloadedMetaClient, records2, writeClient, 
writeConfig, instantTime, false);
+
+      // Check metadata files.
+      deltaCommit = 
reloadedMetaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
+      assertTrue(deltaCommit.isPresent());
+
+      // Check data files.
+      List<String> fileIds2 = getFileIds(getPartitionPath());
+      assertFalse(fileIds2.isEmpty());
+      // One partition one file group.
+      assertEquals(1, fileIds2.size());
+
+      baseFileOption = getLatestBaseFile(getPartitionPath(), fileIds2.get(0));
+      assertTrue(baseFileOption.isPresent());
+
+      // Check data after
+      checkDataEquality(expectedRecordNum);
+    }
+  }
+
+  public static class TestHoodieWriteConfig extends HoodieWriteConfig {
+    TestHoodieWriteConfig(HoodieWriteConfig writeConfig) {
+      super(writeConfig.getEngineType(), writeConfig.getProps());
+    }
+  }
+
+  public static class DefaultWriteConfig extends TestHoodieWriteConfig {
+    DefaultWriteConfig(HoodieWriteConfig writeConfig) {
+      super(writeConfig);
+    }
+
+    @Override
+    public HoodieRecordMerger getRecordMerger() {
+      return new DefaultMerger();
+    }
+  }
+
+  public static class NoFlushWriteConfig extends TestHoodieWriteConfig {
+    NoFlushWriteConfig(HoodieWriteConfig writeConfig) {
+      super(writeConfig);
+    }
+
+    @Override
+    public HoodieRecordMerger getRecordMerger() {
+      return new NoFlushMerger();
+    }
+  }
+
+  public static class CustomWriteConfig extends TestHoodieWriteConfig {
+    CustomWriteConfig(HoodieWriteConfig writeConfig) {
+      super(writeConfig);
+    }
+
+    @Override
+    public HoodieRecordMerger getRecordMerger() {
+      return new CustomMerger();
+    }
+  }
+
+  public static class DefaultMerger extends HoodieSparkRecordMerger {
+    @Override
+    public boolean shouldFlush(HoodieRecord record, Schema schema, 
TypedProperties props) {
+      return true;
+    }
+  }
+
+  public static class NoFlushMerger extends HoodieSparkRecordMerger {
+    @Override
+    public boolean shouldFlush(HoodieRecord record, Schema schema, 
TypedProperties props) {
+      return false;
+    }
+  }
+
+  public static class CustomMerger extends HoodieSparkRecordMerger {
+    @Override
+    public boolean shouldFlush(HoodieRecord record, Schema schema, 
TypedProperties props) throws IOException {
+      if (((HoodieSparkRecord)record).getData().getString(0).equals("001")) {
+        return false;
+      } else {
+        return true;
+      }
+    }
+  }
+}

Reply via email to