This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 65cb3874d29 [HUDI-6702][RFC-46] Support customized logic (#9809)
65cb3874d29 is described below
commit 65cb3874d2976e057b724585175f876ab86df239
Author: Lin Liu <[email protected]>
AuthorDate: Thu Oct 5 07:09:01 2023 -0700
[HUDI-6702][RFC-46] Support customized logic (#9809)
For Spark workflow, current HoodieRecordMerger interface does not support
custom delete logic like the getInsertValue method does in
HoodieRecordPayload interface.
This change fills the hole for the merger interface.
---
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 11 +-
.../hudi/testutils/SparkDatasetTestUtils.java | 59 +++-
.../hudi/common/model/HoodieRecordMerger.java | 16 +
.../hudi/TestHoodieMergeHandleWithSparkMerger.java | 345 +++++++++++++++++++++
4 files changed, 426 insertions(+), 5 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 21c0059474e..79c80e1a5af 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -310,8 +310,15 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
}
try {
if (combineRecord.isPresent() && !combineRecord.get().isDelete(schema,
config.getProps()) && !isDelete) {
- writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop,
preserveMetadata && useWriterSchemaForCompaction);
- recordsWritten++;
+ // Last-minute check.
+ boolean decision = recordMerger.shouldFlush(combineRecord.get(),
schema, config.getProps());
+
+ if (decision) { // CASE (1): Flush the merged record.
+ writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop,
preserveMetadata && useWriterSchemaForCompaction);
+ recordsWritten++;
+ } else { // CASE (2): A delete operation.
+ recordsDeleted++;
+ }
} else {
recordsDeleted++;
// Clear the new location as the record was deleted
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
index 09e6bd699bc..1e07378f9af 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.testutils;
+import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -112,7 +113,30 @@ public class SparkDatasetTestUtils {
public static Dataset<Row> getRandomRows(SQLContext sqlContext, int count,
String partitionPath, boolean isError) {
List<Row> records = new ArrayList<>();
for (long recordNum = 0; recordNum < count; recordNum++) {
- records.add(getRandomValue(partitionPath, isError));
+ records.add(getRandomValue(partitionPath, isError, ""));
+ }
+ return sqlContext.createDataFrame(records, isError ? ERROR_STRUCT_TYPE :
STRUCT_TYPE);
+ }
+
+ public static Dataset<Row> getRandomRowsWithCommitTime(SQLContext sqlContext,
+ int count,
+ String partitionPath,
+ boolean isError,
+ String commitTime) {
+ List<Row> records = new ArrayList<>();
+ for (long recordNum = 0; recordNum < count; recordNum++) {
+ records.add(getRandomValue(partitionPath, isError, commitTime));
+ }
+ return sqlContext.createDataFrame(records, isError ? ERROR_STRUCT_TYPE :
STRUCT_TYPE);
+ }
+
+ public static Dataset<Row> getRandomRowsWithKeys(SQLContext sqlContext,
+ List<HoodieKey> keys,
+ boolean isError,
+ String commitTime) {
+ List<Row> records = new ArrayList<>();
+ for (HoodieKey key : keys) {
+ records.add(getRandomValue(key, isError, commitTime));
}
return sqlContext.createDataFrame(records, isError ? ERROR_STRUCT_TYPE :
STRUCT_TYPE);
}
@@ -123,10 +147,10 @@ public class SparkDatasetTestUtils {
* @param partitionPath partition path to be set in the Row.
* @return the Row thus generated.
*/
- public static Row getRandomValue(String partitionPath, boolean isError) {
+ public static Row getRandomValue(String partitionPath, boolean isError,
String commitTime) {
// order commit time, seq no, record key, partition path, file name
Object[] values = new Object[9];
- values[0] = ""; //commit time
+ values[0] = commitTime; //commit time
if (!isError) {
values[1] = ""; // commit seq no
} else {
@@ -146,6 +170,35 @@ public class SparkDatasetTestUtils {
return new GenericRow(values);
}
+ /**
+ * Generate random Row with a given key.
+ *
+ * @param key a {@link HoodieKey} key.
+ * @return the Row thus generated.
+ */
+ public static Row getRandomValue(HoodieKey key, boolean isError, String
commitTime) {
+ // order commit time, seq no, record key, partition path, file name
+ Object[] values = new Object[9];
+ values[0] = commitTime; //commit time
+ if (!isError) {
+ values[1] = ""; // commit seq no
+ } else {
+ values[1] = RANDOM.nextLong();
+ }
+ values[2] = key.getRecordKey();
+ values[3] = key.getPartitionPath();
+ values[4] = ""; // filename
+ values[5] = UUID.randomUUID().toString();
+ values[6] = key.getPartitionPath();
+ values[7] = RANDOM.nextInt();
+ if (!isError) {
+ values[8] = RANDOM.nextLong();
+ } else {
+ values[8] = UUID.randomUUID().toString();
+ }
+ return new GenericRow(values);
+ }
+
/**
* Convert Dataset<Row>s to List of {@link InternalRow}s.
*
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
index da413592abc..66a2ad77f2e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
@@ -46,6 +46,22 @@ public interface HoodieRecordMerger extends Serializable {
*/
Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException;
+
+ /**
+ * In some cases a business logic does some checks before flushing a merged
record to the disk.
+ * This method does the check, and when false is returned, it means the
merged record should not
+ * be flushed.
+ *
+ * @param record the merged record.
+ * @param schema the schema of the merged record.
+ * @return a boolean variable to indicate if the merged record should be
returned or not.
+ *
+ * <p> This interface is experimental and might be evolved in the future.
+ **/
+ default boolean shouldFlush(HoodieRecord record, Schema schema,
TypedProperties props) throws IOException {
+ return true;
+ }
+
/**
* The record type handled by the current merger.
* SPARK, AVRO, FLINK
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
new file mode 100644
index 00000000000..36f0f3ca351
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
@@ -0,0 +1,345 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.hudi;
+
+import org.apache.avro.Schema;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.testutils.SparkDatasetTestUtils;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieMergeHandleWithSparkMerger extends
SparkClientFunctionalTestHarness {
+ private static final Schema SCHEMA = getAvroSchema("AvroSchema",
"AvroSchemaNS");
+ private HoodieTableMetaClient metaClient;
+
+ public static String getPartitionPath() {
+ return "2023-10-01";
+ }
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ Properties properties = new Properties();
+ properties.setProperty(
+ HoodieTableConfig.BASE_FILE_FORMAT.key(),
+ HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
+ properties.setProperty(
+ "hoodie.payload.ordering.field",
+ "_hoodie_record_key");
+ metaClient = getHoodieMetaClient(hadoopConf(), basePath(),
HoodieTableType.MERGE_ON_READ, properties);
+ }
+
+ @Test
+ public void testDefaultMerger() throws Exception {
+ HoodieWriteConfig writeConfig = buildDefaultWriteConfig(SCHEMA);
+ HoodieRecordMerger merger = writeConfig.getRecordMerger();
+ assertTrue(merger instanceof DefaultMerger);
+ insertAndUpdate(writeConfig, 114);
+ }
+
+ @Test
+ public void testNoFlushMerger() throws Exception {
+ HoodieWriteConfig writeConfig = buildNoFlushWriteConfig(SCHEMA);
+ HoodieRecordMerger merger = writeConfig.getRecordMerger();
+ assertTrue(merger instanceof NoFlushMerger);
+ insertAndUpdate(writeConfig, 64);
+ }
+
+ @Test
+ public void testCustomMerger() throws Exception {
+ HoodieWriteConfig writeConfig = buildCustomWriteConfig(SCHEMA);
+ HoodieRecordMerger merger = writeConfig.getRecordMerger();
+ assertTrue(merger instanceof CustomMerger);
+ insertAndUpdate(writeConfig, 95);
+ }
+
+ public List<HoodieRecord> generateRecords(int numOfRecords, String
commitTime) throws Exception {
+ Dataset<Row> rows = SparkDatasetTestUtils.getRandomRowsWithCommitTime(
+ new SQLContext(jsc()), numOfRecords, getPartitionPath(), false,
commitTime);
+ List<InternalRow> internalRows =
SparkDatasetTestUtils.toInternalRows(rows, SparkDatasetTestUtils.ENCODER);
+ return internalRows.stream()
+ .map(r -> new HoodieSparkRecord(new HoodieKey(r.getString(2),
r.getString(3)),
+ r,
+ SparkDatasetTestUtils.STRUCT_TYPE,
+ false)).collect(Collectors.toList());
+ }
+
+ public List<HoodieRecord> generateRecordUpdates(List<HoodieKey> keys, String
commitTime) throws Exception {
+ Dataset<Row> rows = SparkDatasetTestUtils.getRandomRowsWithKeys(
+ new SQLContext(jsc()), keys, false, commitTime);
+ List<InternalRow> internalRows =
SparkDatasetTestUtils.toInternalRows(rows, SparkDatasetTestUtils.ENCODER);
+ return internalRows.stream()
+ .map(r -> new HoodieSparkRecord(new HoodieKey(r.getString(2),
r.getString(3)),
+ r,
+ SparkDatasetTestUtils.STRUCT_TYPE,
+ false)).collect(Collectors.toList());
+ }
+
+ public List<HoodieRecord> generateEmptyRecords(List<HoodieKey> keys) {
+ List<HoodieRecord> records = new ArrayList<>();
+ for (HoodieKey key : keys) {
+ records.add(new HoodieEmptyRecord(key, HoodieOperation.DELETE, 1,
HoodieRecord.HoodieRecordType.SPARK));
+ }
+ return records;
+ }
+
+ public static List<HoodieKey> getKeys(List<HoodieRecord> records) {
+ return records.stream().map(r -> r.getKey()).collect(Collectors.toList());
+ }
+
+ private static Schema getAvroSchema(String schemaName, String
schemaNameSpace) {
+ return
AvroConversionUtils.convertStructTypeToAvroSchema(SparkDatasetTestUtils.STRUCT_TYPE,
schemaName, schemaNameSpace);
+ }
+
+ public HoodieWriteConfig getWriteConfig(Schema avroSchema) {
+ Properties extraProperties = new Properties();
+ extraProperties.setProperty(
+ "hoodie.datasource.write.record.merger.impls",
+ "org.apache.hudi.HoodieSparkRecordMerger");
+ extraProperties.setProperty(
+ "hoodie.logfile.data.block.format",
+ "parquet");
+ extraProperties.setProperty(
+ "hoodie.payload.ordering.field",
+ "_hoodie_record_key");
+
+ return getConfigBuilder(true)
+ .withPath(basePath())
+ .withSchema(avroSchema.toString())
+ .withProperties(extraProperties)
+ .build();
+ }
+
+ public DefaultWriteConfig buildDefaultWriteConfig(Schema avroSchema) {
+ HoodieWriteConfig config = getWriteConfig(avroSchema);
+ return new DefaultWriteConfig(config);
+ }
+
+ public NoFlushWriteConfig buildNoFlushWriteConfig(Schema avroSchema) {
+ HoodieWriteConfig config = getWriteConfig(avroSchema);
+ return new NoFlushWriteConfig(config);
+ }
+
+ public CustomWriteConfig buildCustomWriteConfig(Schema avroSchema) {
+ HoodieWriteConfig config = getWriteConfig(avroSchema);
+ return new CustomWriteConfig(config);
+ }
+
+ public HoodieTableFileSystemView getFileSystemView() {
+ return new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline());
+ }
+
+ public List<FileSlice> getLatestFileSlices(String partitionPath) {
+ return
getFileSystemView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
+ }
+
+ public Option<FileSlice> getLatestFileSlice(String partitionPath, String
fileId) {
+ return getFileSystemView().getLatestFileSlice(partitionPath, fileId);
+ }
+
+ public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String
fileId) {
+ return getLatestFileSlice(partitionPath, fileId).map(fs ->
fs.getBaseFile().get());
+ }
+
+ public List<HoodieLogFile> getLatestLogFiles(String partitionPath, String
fileId) {
+ Option<FileSlice> fileSliceOpt = getLatestFileSlice(partitionPath, fileId);
+ if (fileSliceOpt.isPresent()) {
+ return fileSliceOpt.get().getLogFiles().collect(Collectors.toList());
+ }
+ return Collections.emptyList();
+ }
+
+ public List<String> getFileIds(String partitionPath) {
+ List<FileSlice> fileSlices = getLatestFileSlices(partitionPath);
+ return fileSlices.stream().map(fs ->
fs.getFileId()).collect(Collectors.toList());
+ }
+
+ public void checkDataEquality(int numRecords) {
+ List<Row> rows = spark()
+ .read()
+ .format("org.apache.hudi")
+ .load(basePath() + "/" + getPartitionPath()).collectAsList();
+ assertEquals(numRecords, rows.size());
+ }
+
+ public void insertAndUpdate(HoodieWriteConfig writeConfig, int
expectedRecordNum) throws Exception {
+ // Check if the table type is correct.
+ HoodieTableMetaClient reloadedMetaClient =
HoodieTableMetaClient.reload(metaClient);
+ HoodieTable hoodieTable = HoodieSparkTable.create(writeConfig, context(),
reloadedMetaClient);
+ assertEquals(hoodieTable.getMetaClient().getTableType(),
HoodieTableType.MERGE_ON_READ);
+
+ // Write and read.
+ try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
+
+ // (1) Write: insert.
+ String instantTime = "001";
+ writeClient.startCommitWithTime(instantTime);
+ List<HoodieRecord> records = generateRecords(100, instantTime);
+ Stream<HoodieBaseFile> baseFileStream =
insertRecordsToMORTable(reloadedMetaClient, records, writeClient, writeConfig,
instantTime);
+ assertTrue(baseFileStream.findAny().isPresent());
+
+ // Check metadata files.
+ Option<HoodieInstant> deltaCommit =
reloadedMetaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
+ assertTrue(deltaCommit.isPresent());
+ assertEquals(instantTime, deltaCommit.get().getTimestamp(), "Delta
commit should be specified value");
+
+ // Check data files.
+ List<String> fileIds = getFileIds(getPartitionPath());
+ assertEquals(1, fileIds.size());
+
+ Option<HoodieBaseFile> baseFileOption =
getLatestBaseFile(getPartitionPath(), fileIds.get(0));
+ assertTrue(baseFileOption.isPresent());
+
+ List<HoodieLogFile> logFiles = getLatestLogFiles(getPartitionPath(),
fileIds.get(0));
+ assertTrue(logFiles.isEmpty());
+ checkDataEquality(100);
+
+ // (2) Write: append.
+ instantTime = "002";
+ writeClient.startCommitWithTime(instantTime);
+
+ List<HoodieRecord> records2 =
generateEmptyRecords(getKeys(records).subList(0, 17)); // 17 records with old
keys.
+ List<HoodieRecord> records3 =
generateRecordUpdates(getKeys(records).subList(17, 36), "001"); // 19 update
records.
+ List<HoodieRecord> records4 = generateRecords(31, instantTime); // 31
new records.
+ records2.addAll(records3);
+ records2.addAll(records4);
+ assertEquals(67, records2.size());
+ updateRecordsInMORTable(reloadedMetaClient, records2, writeClient,
writeConfig, instantTime, false);
+
+ // Check metadata files.
+ deltaCommit =
reloadedMetaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
+ assertTrue(deltaCommit.isPresent());
+
+ // Check data files.
+ List<String> fileIds2 = getFileIds(getPartitionPath());
+ assertFalse(fileIds2.isEmpty());
+ // One partition one file group.
+ assertEquals(1, fileIds2.size());
+
+ baseFileOption = getLatestBaseFile(getPartitionPath(), fileIds2.get(0));
+ assertTrue(baseFileOption.isPresent());
+
+ // Check data after
+ checkDataEquality(expectedRecordNum);
+ }
+ }
+
+ public static class TestHoodieWriteConfig extends HoodieWriteConfig {
+ TestHoodieWriteConfig(HoodieWriteConfig writeConfig) {
+ super(writeConfig.getEngineType(), writeConfig.getProps());
+ }
+ }
+
+ public static class DefaultWriteConfig extends TestHoodieWriteConfig {
+ DefaultWriteConfig(HoodieWriteConfig writeConfig) {
+ super(writeConfig);
+ }
+
+ @Override
+ public HoodieRecordMerger getRecordMerger() {
+ return new DefaultMerger();
+ }
+ }
+
+ public static class NoFlushWriteConfig extends TestHoodieWriteConfig {
+ NoFlushWriteConfig(HoodieWriteConfig writeConfig) {
+ super(writeConfig);
+ }
+
+ @Override
+ public HoodieRecordMerger getRecordMerger() {
+ return new NoFlushMerger();
+ }
+ }
+
+ public static class CustomWriteConfig extends TestHoodieWriteConfig {
+ CustomWriteConfig(HoodieWriteConfig writeConfig) {
+ super(writeConfig);
+ }
+
+ @Override
+ public HoodieRecordMerger getRecordMerger() {
+ return new CustomMerger();
+ }
+ }
+
+ public static class DefaultMerger extends HoodieSparkRecordMerger {
+ @Override
+ public boolean shouldFlush(HoodieRecord record, Schema schema,
TypedProperties props) {
+ return true;
+ }
+ }
+
+ public static class NoFlushMerger extends HoodieSparkRecordMerger {
+ @Override
+ public boolean shouldFlush(HoodieRecord record, Schema schema,
TypedProperties props) {
+ return false;
+ }
+ }
+
+ public static class CustomMerger extends HoodieSparkRecordMerger {
+ @Override
+ public boolean shouldFlush(HoodieRecord record, Schema schema,
TypedProperties props) throws IOException {
+ if (((HoodieSparkRecord)record).getData().getString(0).equals("001")) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+ }
+}