This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3a5bddaca8 [HUDI-4808] Fix HoodieSimpleBucketIndex not consider bucket
num in lo… (#6630)
3a5bddaca8 is described below
commit 3a5bddaca89e24ae815c16c4377a980e2c631327
Author: FocusComputing <[email protected]>
AuthorDate: Mon Sep 19 14:16:24 2022 +0800
[HUDI-4808] Fix HoodieSimpleBucketIndex not consider bucket num in lo…
(#6630)
* [HUDI-4808] Fix HoodieSimpleBucketIndex not consider bucket num in log
file issue
Co-authored-by: xiaoxingstack <[email protected]>
---
.../org/apache/hudi/index/HoodieIndexUtils.java | 21 ++++++++++++++++++
.../hudi/index/bucket/HoodieSimpleBucketIndex.java | 10 ++++-----
.../hudi/testutils/HoodieWriteableTestTable.java | 25 ++++++++--------------
.../index/bucket/TestHoodieSimpleBucketIndex.java | 21 +++++++++++++-----
.../testutils/HoodieSparkWriteableTestTable.java | 11 ++++++++++
5 files changed, 62 insertions(+), 26 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 9b3dc8df00..61be856d36 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -72,6 +73,26 @@ public class HoodieIndexUtils {
return Collections.emptyList();
}
+ /**
+ * Fetches Pair of partition path and {@link FileSlice}s for interested
partitions.
+ *
+ * @param partition Partition of interest
+ * @param hoodieTable Instance of {@link HoodieTable} of interest
+ * @return the list of {@link FileSlice}
+ */
+ public static List<FileSlice> getLatestFileSlicesForPartition(
+ final String partition,
+ final HoodieTable hoodieTable) {
+ Option<HoodieInstant> latestCommitTime =
hoodieTable.getMetaClient().getCommitsTimeline()
+ .filterCompletedInstants().lastInstant();
+ if (latestCommitTime.isPresent()) {
+ return hoodieTable.getHoodieView()
+ .getLatestFileSlicesBeforeOrOn(partition,
latestCommitTime.get().getTimestamp(), true)
+ .collect(toList());
+ }
+ return Collections.emptyList();
+ }
+
/**
* Fetches Pair of partition path and {@link HoodieBaseFile}s for interested
partitions.
*
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
index 2ccebb472f..aae50e1f95 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
@@ -25,7 +25,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.table.HoodieTable;
-
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -52,10 +51,11 @@ public class HoodieSimpleBucketIndex extends
HoodieBucketIndex {
Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping = new
HashMap<>();
hoodieTable.getMetaClient().reloadActiveTimeline();
HoodieIndexUtils
- .getLatestBaseFilesForPartition(partition, hoodieTable)
- .forEach(file -> {
- String fileId = file.getFileId();
- String commitTime = file.getCommitTime();
+ .getLatestFileSlicesForPartition(partition, hoodieTable)
+ .forEach(fileSlice -> {
+ String fileId = fileSlice.getFileId();
+ String commitTime = fileSlice.getBaseInstantTime();
+
int bucketId = BucketIdentifier.bucketIdFromFileId(fileId);
if (!bucketIdToFileIdMapping.containsKey(bucketId)) {
bucketIdToFileIdMapping.put(bucketId, new
HoodieRecordLocation(commitTime, fileId));
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
index 2f00b82772..8e7df833cc 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
@@ -32,7 +32,6 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -45,9 +44,9 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
-import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.io.storage.HoodieOrcWriter;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -152,27 +151,21 @@ public class HoodieWriteableTestTable extends
HoodieMetadataTestTable {
return baseFilePath;
}
- public Map<String, List<HoodieLogFile>> withLogAppends(List<HoodieRecord>
records) throws Exception {
+ public Map<String, List<HoodieLogFile>> withLogAppends(String partition,
String fileId, List<HoodieRecord> records) throws Exception {
Map<String, List<HoodieLogFile>> partitionToLogfilesMap = new HashMap<>();
- for (List<HoodieRecord> groupedRecords : records.stream()
-
.collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation)).values()) {
- final Pair<String, HoodieLogFile> appendedLogFile =
appendRecordsToLogFile(groupedRecords);
- partitionToLogfilesMap.computeIfAbsent(
- appendedLogFile.getKey(), k -> new
ArrayList<>()).add(appendedLogFile.getValue());
- }
+ final Pair<String, HoodieLogFile> appendedLogFile =
appendRecordsToLogFile(partition, fileId, records);
+ partitionToLogfilesMap.computeIfAbsent(appendedLogFile.getKey(), k -> new
ArrayList<>()).add(appendedLogFile.getValue());
return partitionToLogfilesMap;
}
- private Pair<String, HoodieLogFile>
appendRecordsToLogFile(List<HoodieRecord> groupedRecords) throws Exception {
- String partitionPath = groupedRecords.get(0).getPartitionPath();
- HoodieRecordLocation location = groupedRecords.get(0).getCurrentLocation();
+ private Pair<String, HoodieLogFile> appendRecordsToLogFile(String
partitionPath, String fileId, List<HoodieRecord> records) throws Exception {
try (HoodieLogFormat.Writer logWriter =
HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath,
partitionPath))
-
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
- .overBaseCommit(location.getInstantTime()).withFs(fs).build()) {
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
+ .overBaseCommit(currentInstantTime).withFs(fs).build()) {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
- header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME,
location.getInstantTime());
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME,
currentInstantTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- logWriter.appendBlock(new
HoodieAvroDataBlock(groupedRecords.stream().map(r -> {
+ logWriter.appendBlock(new HoodieAvroDataBlock(records.stream().map(r -> {
try {
GenericRecord val = (GenericRecord) ((HoodieRecordPayload)
r.getData()).getInsertValue(schema).get();
HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(),
r.getPartitionPath(), "");
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java
index a96ce04077..ea6418696c 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java
@@ -42,6 +42,8 @@ import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Properties;
@@ -89,8 +91,9 @@ public class TestHoodieSimpleBucketIndex extends
HoodieClientTestHarness {
.withBucketNum("8").build();
}
- @Test
- public void testTagLocation() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testTagLocation(boolean isInsert) throws Exception {
String rowKey1 = UUID.randomUUID().toString();
String rowKey2 = UUID.randomUUID().toString();
String rowKey3 = UUID.randomUUID().toString();
@@ -119,9 +122,17 @@ public class TestHoodieSimpleBucketIndex extends
HoodieClientTestHarness {
assertFalse(taggedRecordRDD.collectAsList().stream().anyMatch(r ->
r.isCurrentLocationKnown()));
HoodieSparkWriteableTestTable testTable =
HoodieSparkWriteableTestTable.of(table, SCHEMA);
- testTable.addCommit("001").withInserts("2016/01/31",
getRecordFileId(record1), record1);
- testTable.addCommit("002").withInserts("2016/01/31",
getRecordFileId(record2), record2);
- testTable.addCommit("003").withInserts("2016/01/31",
getRecordFileId(record3), record3);
+
+ if (isInsert) {
+ testTable.addCommit("001").withInserts("2016/01/31",
getRecordFileId(record1), record1);
+ testTable.addCommit("002").withInserts("2016/01/31",
getRecordFileId(record2), record2);
+ testTable.addCommit("003").withInserts("2016/01/31",
getRecordFileId(record3), record3);
+ } else {
+ testTable.addCommit("001").withLogAppends("2016/01/31",
getRecordFileId(record1), record1);
+ testTable.addCommit("002").withLogAppends("2016/01/31",
getRecordFileId(record2), record2);
+ testTable.addCommit("003").withLogAppends("2016/01/31",
getRecordFileId(record3), record3);
+ }
+
taggedRecordRDD = bucketIndex.tagLocation(HoodieJavaRDD.of(recordRDD),
context,
HoodieSparkTable.create(config, context, metaClient));
assertFalse(taggedRecordRDD.collectAsList().stream().filter(r ->
r.isCurrentLocationKnown())
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java
index 8940223926..3b50d1b29b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
@@ -36,6 +37,7 @@ import org.apache.log4j.Logger;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
public class HoodieSparkWriteableTestTable extends HoodieWriteableTestTable {
@@ -116,4 +118,13 @@ public class HoodieSparkWriteableTestTable extends
HoodieWriteableTestTable {
public Path withInserts(String partition, String fileId, List<HoodieRecord>
records) throws Exception {
return super.withInserts(partition, fileId, records, new
SparkTaskContextSupplier());
}
+
+ public HoodieSparkWriteableTestTable withLogAppends(String partition, String
fileId, HoodieRecord... records) throws Exception {
+ withLogAppends(partition, fileId, Arrays.asList(records));
+ return this;
+ }
+
+ public Map<String, List<HoodieLogFile>> withLogAppends(String partition,
String fileId, List<HoodieRecord> records) throws Exception {
+ return super.withLogAppends(partition, fileId, records);
+ }
}