This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 83f8874f7c1 [HUDI-6648] Support building table views from existing
files (#9367)
83f8874f7c1 is described below
commit 83f8874f7c1a647fa43acdbed54579f87d6e9b3c
Author: Tim Brown <[email protected]>
AuthorDate: Mon Aug 7 08:49:56 2023 -0700
[HUDI-6648] Support building table views from existing files (#9367)
This change allows us to support creating a table from a set of initial
files
that were not created by Hudi. This will allow us to bootstrap tables from
an existing set of files instead of requiring users to rewrite tables with
filenames in a format that Hudi expects (outside of the bootstrap feature).
This feature only works if metadata table is enabled.
---
.../metadata/HoodieBackedTableMetadataWriter.java | 15 +-
.../functional/TestExternalPathHandling.java | 296 +++++++++++++++++++++
.../hudi/common/config/HoodieMetadataConfig.java | 12 +
.../java/org/apache/hudi/common/fs/FSUtils.java | 2 +-
.../apache/hudi/common/model/HoodieBaseFile.java | 71 ++++-
.../common/table/timeline/dto/BaseFileDTO.java | 14 +-
.../common/table/timeline/dto/FilePathDTO.java | 4 +-
.../view/HoodieTablePreCommitFileSystemView.java | 2 +-
.../hudi/common/util/ExternalFilePathUtil.java | 46 ++++
.../hudi/metadata/HoodieMetadataPayload.java | 33 +--
.../hudi/metadata/HoodieTableMetadataUtil.java | 44 ++-
.../hudi/common/model/TestHoodieBaseFile.java | 16 ++
.../hudi/metadata/TestHoodieMetadataPayload.java | 18 +-
13 files changed, 502 insertions(+), 71 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 53ad933197e..8e213183472 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -376,7 +376,18 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
// Get a complete list of files and partitions from the file system or
from already initialized FILES partition of MDT
- List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ?
listAllPartitionsFromMDT(initializationTime) :
listAllPartitionsFromFilesystem(initializationTime);
+ List<DirectoryInfo> partitionInfoList;
+ if (filesPartitionAvailable) {
+ partitionInfoList = listAllPartitionsFromMDT(initializationTime);
+ } else {
+ // if auto initialization is enabled, then we need to list all
partitions from the file system
+ if (dataWriteConfig.getMetadataConfig().shouldAutoInitialize()) {
+ partitionInfoList =
listAllPartitionsFromFilesystem(initializationTime);
+ } else {
+ // if auto initialization is disabled, we can return an empty list
+ partitionInfoList = Collections.emptyList();
+ }
+ }
Map<String, Map<String, Long>> partitionToFilesMap =
partitionInfoList.stream()
.map(p -> {
String partitionName =
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
@@ -571,7 +582,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
HoodieData<HoodieRecord> fileListRecords =
engineContext.parallelize(partitionInfoList,
partitionInfoList.size()).map(partitionInfo -> {
Map<String, Long> fileNameToSizeMap =
partitionInfo.getFileNameToSizeMap();
return HoodieMetadataPayload.createPartitionFilesRecord(
-
HoodieTableMetadataUtil.getPartitionIdentifier(partitionInfo.getRelativePath()),
Option.of(fileNameToSizeMap), Option.empty());
+
HoodieTableMetadataUtil.getPartitionIdentifier(partitionInfo.getRelativePath()),
fileNameToSizeMap, Collections.emptyList());
});
ValidationUtils.checkState(fileListRecords.count() == partitions.size());
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
new file mode 100644
index 00000000000..9f28290f1ad
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
+import org.apache.hudi.avro.model.IntWrapper;
+import org.apache.hudi.avro.model.StringWrapper;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieDeltaWriteStat;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.action.clean.CleanPlanner;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY;
+
+/**
+ * Asserts that tables initialized from file paths created outside Hudi can
properly be loaded.
+ */
+public class TestExternalPathHandling extends HoodieClientTestBase {
+
+ private static final String FIELD_1 = "field1";
+ private static final String FIELD_2 = "field2";
+ private HoodieWriteConfig writeConfig;
+
+ @ParameterizedTest
+ @MethodSource("getArgs")
+ public void testFlow(FileIdAndNameGenerator fileIdAndNameGenerator,
List<String> partitions) throws Exception {
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ Properties properties = new Properties();
+ properties.setProperty(HoodieMetadataConfig.AUTO_INITIALIZE.key(),
"false");
+ writeConfig = HoodieWriteConfig.newBuilder()
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(INMEMORY).build())
+ .withPath(metaClient.getBasePathV2().toString())
+ .withEmbeddedTimelineServerEnabled(false)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(2)
+ .enable(true)
+ .withMetadataIndexColumnStats(true)
+ .withColumnStatsIndexForColumns(FIELD_1 + "," + FIELD_2)
+ .withProperties(properties)
+ .build())
+
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(1,
2).build())
+ .withTableServicesEnabled(true)
+ .build();
+
+ writeClient = getHoodieWriteClient(writeConfig);
+ String instantTime1 =
writeClient.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION, metaClient);
+ String partitionPath1 = partitions.get(0);
+ Pair<String, String> fileIdAndName1 = fileIdAndNameGenerator.generate(1,
instantTime1);
+ String fileId1 = fileIdAndName1.getLeft();
+ String fileName1 = fileIdAndName1.getRight();
+ String filePath1 = getPath(partitionPath1, fileName1);
+ WriteStatus writeStatus1 = createWriteStatus(instantTime1, partitionPath1,
filePath1, fileId1);
+ JavaRDD<WriteStatus> rdd1 =
createRdd(Collections.singletonList(writeStatus1));
+ metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(new
HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime1), Option.empty());
+ writeClient.commit(instantTime1, rdd1, Option.empty(),
HoodieTimeline.REPLACE_COMMIT_ACTION, Collections.emptyMap());
+
+ assertFileGroupCorrectness(instantTime1, partitionPath1, filePath1,
fileId1, 1);
+
+ // add a new file and remove the old one
+ String instantTime2 =
writeClient.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION, metaClient);
+ Pair<String, String> fileIdAndName2 = fileIdAndNameGenerator.generate(2,
instantTime2);
+ String fileId2 = fileIdAndName2.getLeft();
+ String fileName2 = fileIdAndName2.getRight();
+ String filePath2 = getPath(partitionPath1, fileName2);
+ WriteStatus newWriteStatus = createWriteStatus(instantTime2,
partitionPath1, filePath2, fileId2);
+ JavaRDD<WriteStatus> rdd2 =
createRdd(Collections.singletonList(newWriteStatus));
+ Map<String, List<String>> partitionToReplacedFileIds = new HashMap<>();
+ partitionToReplacedFileIds.put(partitionPath1,
Collections.singletonList(fileId1));
+ metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(new
HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime2), Option.empty());
+ writeClient.commit(instantTime2, rdd2, Option.empty(),
HoodieTimeline.REPLACE_COMMIT_ACTION, partitionToReplacedFileIds);
+
+ assertFileGroupCorrectness(instantTime2, partitionPath1, filePath2,
fileId2, 1);
+
+ // Add file to a new partition
+ String partitionPath2 = partitions.get(1);
+ String instantTime3 =
writeClient.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION, metaClient);
+ Pair<String, String> fileIdAndName3 = fileIdAndNameGenerator.generate(3,
instantTime3);
+ String fileId3 = fileIdAndName3.getLeft();
+ String fileName3 = fileIdAndName3.getRight();
+ String filePath3 = getPath(partitionPath2, fileName3);
+ WriteStatus writeStatus3 = createWriteStatus(instantTime3, partitionPath2,
filePath3, fileId3);
+ JavaRDD<WriteStatus> rdd3 =
createRdd(Collections.singletonList(writeStatus3));
+ metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(new
HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime3), Option.empty());
+ writeClient.commit(instantTime3, rdd3, Option.empty(),
HoodieTimeline.REPLACE_COMMIT_ACTION, Collections.emptyMap());
+
+ assertFileGroupCorrectness(instantTime3, partitionPath2, filePath3,
fileId3, partitionPath2.isEmpty() ? 2 : 1);
+
+ // clean first commit
+ String cleanTime = HoodieActiveTimeline.createNewInstantTime();
+ HoodieCleanerPlan cleanerPlan = cleanerPlan(new
HoodieActionInstant(instantTime2, HoodieTimeline.REPLACE_COMMIT_ACTION,
HoodieInstant.State.COMPLETED.name()), instantTime3,
+ Collections.singletonMap(partitionPath1, Collections.singletonList(new
HoodieCleanFileInfo(filePath1, false))));
+ metaClient.getActiveTimeline().saveToCleanRequested(new
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION,
cleanTime),
+ TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
+ HoodieInstant inflightClean =
metaClient.getActiveTimeline().transitionCleanRequestedToInflight(
+ new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.CLEAN_ACTION, cleanTime), Option.empty());
+ List<HoodieCleanStat> cleanStats =
Collections.singletonList(createCleanStat(partitionPath1,
Arrays.asList(filePath1), instantTime2, instantTime3));
+ HoodieCleanMetadata cleanMetadata = CleanerUtils.convertCleanMetadata(
+ cleanTime,
+ Option.empty(),
+ cleanStats);
+ HoodieTableMetadataWriter hoodieTableMetadataWriter =
(HoodieTableMetadataWriter) writeClient.initTable(WriteOperationType.UPSERT,
Option.of(cleanTime)).getMetadataWriter(cleanTime).get();
+ hoodieTableMetadataWriter.update(cleanMetadata, cleanTime);
+
metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightClean,
+ TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
+ // make sure we still get the same results as before
+ assertFileGroupCorrectness(instantTime2, partitionPath1, filePath2,
fileId2, partitionPath2.isEmpty() ? 2 : 1);
+ assertFileGroupCorrectness(instantTime3, partitionPath2, filePath3,
fileId3, partitionPath2.isEmpty() ? 2 : 1);
+
+ // trigger archiver manually
+ writeClient.archive();
+ // assert commit was archived
+ Assertions.assertEquals(1,
metaClient.getArchivedTimeline().reload().filterCompletedInstants().countInstants());
+ // make sure we still get the same results as before
+ assertFileGroupCorrectness(instantTime2, partitionPath1, filePath2,
fileId2, partitionPath2.isEmpty() ? 2 : 1);
+ assertFileGroupCorrectness(instantTime3, partitionPath2, filePath3,
fileId3, partitionPath2.isEmpty() ? 2 : 1);
+
+ // assert that column stats are correct
+ HoodieBackedTableMetadata hoodieBackedTableMetadata = new
HoodieBackedTableMetadata(context, writeConfig.getMetadataConfig(),
writeConfig.getBasePath(), true);
+ assertEmptyColStats(hoodieBackedTableMetadata, partitionPath1, fileName1);
+ assertColStats(hoodieBackedTableMetadata, partitionPath1, fileName2);
+ assertColStats(hoodieBackedTableMetadata, partitionPath2, fileName3);
+ }
+
+ static Stream<Arguments> getArgs() {
+ FileIdAndNameGenerator external = (index, instantTime) -> {
+ String fileName = String.format("file_%d.parquet", index);
+ String fileId = fileName;
+ return Pair.of(fileId, fileName);
+ };
+ List<String> partitionedTable = Arrays.asList("americas/brazil",
"americas/argentina");
+ List<String> unpartitionedTable = Arrays.asList("", "");
+ return Stream.of(Arguments.of(external, partitionedTable),
Arguments.of(external, unpartitionedTable));
+ }
+
+ private String getPath(String partitionPath, String fileName) {
+ if (partitionPath.isEmpty()) {
+ return fileName;
+ }
+ return String.format("%s/%s", partitionPath, fileName);
+ }
+
+ @FunctionalInterface
+ private interface FileIdAndNameGenerator {
+ Pair<String, String> generate(int iteration, String instantTime);
+ }
+
+ private void assertFileGroupCorrectness(String instantTime, String
partitionPath, String filePath, String fileId, int expectedSize) {
+ HoodieTableFileSystemView fsView = new
HoodieMetadataFileSystemView(context, metaClient,
metaClient.reloadActiveTimeline(), writeConfig.getMetadataConfig());
+ List<HoodieFileGroup> fileGroups =
fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
+ Assertions.assertEquals(expectedSize, fileGroups.size());
+ Option<HoodieFileGroup> fileGroupOption =
Option.fromJavaOptional(fileGroups.stream().filter(fg ->
fg.getFileGroupId().getFileId().equals(fileId)).findFirst());
+ Assertions.assertTrue(fileGroupOption.isPresent());
+ HoodieFileGroup fileGroup = fileGroupOption.get();
+ Assertions.assertEquals(fileId, fileGroup.getFileGroupId().getFileId());
+ Assertions.assertEquals(partitionPath, fileGroup.getPartitionPath());
+ HoodieBaseFile baseFile = fileGroup.getAllBaseFiles().findFirst().get();
+ Assertions.assertEquals(instantTime, baseFile.getCommitTime());
+ Assertions.assertEquals(metaClient.getBasePathV2().toString() + "/" +
filePath, baseFile.getPath());
+ }
+
+ private void assertEmptyColStats(HoodieBackedTableMetadata
hoodieBackedTableMetadata, String partitionPath, String fileName) {
+
Assertions.assertTrue(hoodieBackedTableMetadata.getColumnStats(Collections.singletonList(Pair.of(partitionPath,
fileName)), FIELD_1).isEmpty());
+
Assertions.assertTrue(hoodieBackedTableMetadata.getColumnStats(Collections.singletonList(Pair.of(partitionPath,
fileName)), FIELD_2).isEmpty());
+ }
+
+ private void assertColStats(HoodieBackedTableMetadata
hoodieBackedTableMetadata, String partitionPath, String fileName) {
+ Map<Pair<String, String>, HoodieMetadataColumnStats> field1ColStats =
hoodieBackedTableMetadata.getColumnStats(Collections.singletonList(Pair.of(partitionPath,
fileName)), FIELD_1);
+ Assertions.assertEquals(1, field1ColStats.size());
+ HoodieMetadataColumnStats column1stats =
field1ColStats.get(Pair.of(partitionPath, fileName));
+ Assertions.assertEquals(FIELD_1, column1stats.getColumnName());
+ Assertions.assertEquals(fileName, column1stats.getFileName());
+ Assertions.assertEquals(new IntWrapper(1), column1stats.getMinValue());
+ Assertions.assertEquals(new IntWrapper(2), column1stats.getMaxValue());
+ Assertions.assertEquals(2, column1stats.getValueCount());
+ Assertions.assertEquals(0, column1stats.getNullCount());
+ Assertions.assertEquals(5, column1stats.getTotalSize());
+ Assertions.assertEquals(10, column1stats.getTotalUncompressedSize());
+
+
+ Map<Pair<String, String>, HoodieMetadataColumnStats> field2ColStats =
hoodieBackedTableMetadata.getColumnStats(Collections.singletonList(Pair.of(partitionPath,
fileName)), FIELD_2);
+ Assertions.assertEquals(1, field2ColStats.size());
+ HoodieMetadataColumnStats column2stats =
field2ColStats.get(Pair.of(partitionPath, fileName));
+ Assertions.assertEquals(FIELD_2, column2stats.getColumnName());
+ Assertions.assertEquals(fileName, column2stats.getFileName());
+ Assertions.assertEquals(new StringWrapper("a"),
column2stats.getMinValue());
+ Assertions.assertEquals(new StringWrapper("b"),
column2stats.getMaxValue());
+ Assertions.assertEquals(3, column2stats.getValueCount());
+ Assertions.assertEquals(1, column2stats.getNullCount());
+ Assertions.assertEquals(10, column2stats.getTotalSize());
+ Assertions.assertEquals(20, column2stats.getTotalUncompressedSize());
+ }
+
+ private JavaRDD<WriteStatus> createRdd(List<WriteStatus> writeStatuses) {
+ return jsc.parallelize(writeStatuses, 1);
+ }
+
+ private WriteStatus createWriteStatus(String commitTime, String
partitionPath, String filePath, String fileId) {
+ WriteStatus writeStatus = new WriteStatus();
+ writeStatus.setFileId(fileId);
+ writeStatus.setPartitionPath(partitionPath);
+ HoodieDeltaWriteStat writeStat = new HoodieDeltaWriteStat();
+ writeStat.setFileId(fileId);
+
writeStat.setPath(ExternalFilePathUtil.appendCommitTimeAndExternalFileMarker(filePath,
commitTime));
+ writeStat.setPartitionPath(partitionPath);
+ writeStat.setNumWrites(3);
+ writeStat.setNumDeletes(0);
+ writeStat.setNumUpdateWrites(0);
+ writeStat.setNumInserts(3);
+ writeStat.setTotalWriteBytes(400);
+ writeStat.setTotalWriteErrors(0);
+ writeStat.setFileSizeInBytes(400);
+ writeStat.setTotalLogBlocks(0);
+ writeStat.setTotalLogRecords(0);
+ writeStat.setTotalLogFilesCompacted(0);
+ writeStat.setTotalLogSizeCompacted(0);
+ writeStat.setTotalUpdatedRecordsCompacted(0);
+ writeStat.setTotalCorruptLogBlock(0);
+ writeStat.setTotalRollbackBlocks(0);
+ Map<String, HoodieColumnRangeMetadata<Comparable>> stats = new HashMap<>();
+ stats.put(FIELD_1, HoodieColumnRangeMetadata.<Comparable>create(filePath,
FIELD_1, 1, 2, 0, 2, 5, 10));
+ stats.put(FIELD_2, HoodieColumnRangeMetadata.<Comparable>create(filePath,
FIELD_2, "a", "b", 1, 3, 10, 20));
+ writeStat.putRecordsStats(stats);
+ writeStatus.setStat(writeStat);
+ return writeStatus;
+ }
+
+ private HoodieCleanStat createCleanStat(String partitionPath, List<String>
deletePaths, String earliestCommitToRetain, String
lastCompletedCommitTimestamp) {
+ return new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
partitionPath, deletePaths, deletePaths, Collections.emptyList(),
+ earliestCommitToRetain, lastCompletedCommitTimestamp);
+ }
+
+ private HoodieCleanerPlan cleanerPlan(HoodieActionInstant
earliestInstantToRetain, String latestCommit, Map<String,
List<HoodieCleanFileInfo>> filePathsToBeDeletedPerPartition) {
+ return new HoodieCleanerPlan(earliestInstantToRetain,
+ latestCommit,
+ writeConfig.getCleanerPolicy().name(), Collections.emptyMap(),
+ CleanPlanner.LATEST_CLEAN_PLAN_VERSION,
filePathsToBeDeletedPerPartition, Collections.emptyList());
+ }
+}
\ No newline at end of file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 614c97e3ad5..6d72130f770 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -312,6 +312,14 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.withDocumentation("Maximum size in bytes of a single log file. Larger
log files can contain larger log blocks "
+ "thereby reducing the number of blocks to search for keys");
+ public static final ConfigProperty<Boolean> AUTO_INITIALIZE = ConfigProperty
+ .key(METADATA_PREFIX + ".auto.initialize")
+ .defaultValue(true)
+ .sinceVersion("0.14.0")
+ .markAdvanced()
+ .withDocumentation("Initializes the metadata table by reading from the
file system when the table is first created. Enabled by default. "
+ + "Warning: This should only be disabled when manually constructing
the metadata table outside of typical Hudi writer flows.");
+
public long getMaxLogFileSize() {
return getLong(MAX_LOG_FILE_SIZE_BYTES_PROP);
}
@@ -440,6 +448,10 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return getInt(RECORD_INDEX_MAX_PARALLELISM);
}
+ public boolean shouldAutoInitialize() {
+ return getBoolean(AUTO_INITIALIZE);
+ }
+
public static class Builder {
private EngineType engineType = EngineType.SPARK;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 455f80af497..71825a2fd34 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -473,7 +473,7 @@ public class FSUtils {
public static boolean isLogFile(String fileName) {
Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
- return matcher.find() && fileName.contains(".log");
+ return fileName.contains(".log") && matcher.find();
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
index ed1c32698eb..1fddf02711a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
@@ -18,9 +18,14 @@
package org.apache.hudi.common.model;
+import org.apache.hudi.common.util.ExternalFilePathUtil;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.CachingPath;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hudi.hadoop.CachingPath.createRelativePathUnsafe;
/**
* Hoodie base file - Represents metadata about Hudi file in DFS.
@@ -47,11 +52,7 @@ public class HoodieBaseFile extends BaseFile {
}
public HoodieBaseFile(FileStatus fileStatus, BaseFile bootstrapBaseFile) {
- super(fileStatus);
- this.bootstrapBaseFile = Option.ofNullable(bootstrapBaseFile);
- String[] fileIdAndCommitTime = getFileIdAndCommitTimeFromFileName();
- this.fileId = fileIdAndCommitTime[0];
- this.commitTime = fileIdAndCommitTime[1];
+ this(fileStatus,
getFileIdAndCommitTimeFromFileName(fileStatus.getPath().getName()),
bootstrapBaseFile);
}
public HoodieBaseFile(String filePath) {
@@ -61,16 +62,39 @@ public class HoodieBaseFile extends BaseFile {
public HoodieBaseFile(String filePath, BaseFile bootstrapBaseFile) {
super(filePath);
this.bootstrapBaseFile = Option.ofNullable(bootstrapBaseFile);
- String[] fileIdAndCommitTime = getFileIdAndCommitTimeFromFileName();
+ String[] fileIdAndCommitTime =
getFileIdAndCommitTimeFromFileName(getFileName());
this.fileId = fileIdAndCommitTime[0];
this.commitTime = fileIdAndCommitTime[1];
}
+ public HoodieBaseFile(String filePath, String fileId, String commitTime,
BaseFile bootstrapBaseFile) {
+ super(filePath);
+ this.bootstrapBaseFile = Option.ofNullable(bootstrapBaseFile);
+ this.fileId = fileId;
+ this.commitTime = commitTime;
+ }
+
+ private HoodieBaseFile(FileStatus fileStatus, String[] fileIdAndCommitTime,
BaseFile bootstrapBaseFile) {
+ this(fileStatus, fileIdAndCommitTime[0], fileIdAndCommitTime[1],
bootstrapBaseFile);
+ }
+
+ public HoodieBaseFile(FileStatus fileStatus, String fileId, String
commitTime, BaseFile bootstrapBaseFile) {
+ super(maybeHandleExternallyGeneratedFileName(fileStatus, fileId));
+ this.bootstrapBaseFile = Option.ofNullable(bootstrapBaseFile);
+ this.fileId = fileId;
+ this.commitTime = commitTime;
+ }
+
/**
* Parses the file ID and commit time from the fileName.
+ * @param fileName Name of the file
* @return String array of size 2 with fileId as the first and commitTime as
the second element.
*/
- private String[] getFileIdAndCommitTimeFromFileName() {
+ private static String[] getFileIdAndCommitTimeFromFileName(String fileName) {
+ return ExternalFilePathUtil.isExternallyCreatedFile(fileName) ?
handleExternallyGeneratedFile(fileName) : handleHudiGeneratedFile(fileName);
+ }
+
+ private static String[] handleHudiGeneratedFile(String fileName) {
String[] values = new String[2];
short underscoreCount = 0;
short lastUnderscoreIndex = 0;
@@ -94,6 +118,39 @@ public class HoodieBaseFile extends BaseFile {
return values;
}
+ private static String[] handleExternallyGeneratedFile(String fileName) {
+ String[] values = new String[2];
+ // file name has format <originalFileName>_<commitTime>_hudiext and
originalFileName is used as fileId
+ int lastUnderscore = fileName.lastIndexOf(UNDERSCORE);
+ int secondToLastUnderscore = fileName.lastIndexOf(UNDERSCORE,
lastUnderscore - 1);
+ values[0] = fileName.substring(0, secondToLastUnderscore);
+ values[1] = fileName.substring(secondToLastUnderscore + 1, lastUnderscore);
+ return values;
+ }
+
+ /**
+ * If the file was created externally, the original file path will have a
'_[commitTime]_hudiext' suffix when stored in the metadata table. That suffix
needs to be removed from the FileStatus so
+ * that the actual file can be found and read.
+ * @param fileStatus an input file status that may require updating
+ * @param fileId the fileId for the file
+ * @return the original file status if it was not externally created, or a
new FileStatus with the original file name if it was externally created
+ */
+ private static FileStatus maybeHandleExternallyGeneratedFileName(FileStatus
fileStatus, String fileId) {
+ if (fileStatus == null) {
+ return null;
+ }
+ if
(ExternalFilePathUtil.isExternallyCreatedFile(fileStatus.getPath().getName())) {
+ // fileId is the same as the original file name for externally created
files
+ Path parent = fileStatus.getPath().getParent();
+ return new FileStatus(fileStatus.getLen(), fileStatus.isDirectory(),
fileStatus.getReplication(),
+ fileStatus.getBlockSize(), fileStatus.getModificationTime(),
fileStatus.getAccessTime(),
+ fileStatus.getPermission(), fileStatus.getOwner(),
fileStatus.getGroup(),
+ new CachingPath(parent, createRelativePathUnsafe(fileId)));
+ } else {
+ return fileStatus;
+ }
+ }
+
public String getFileId() {
return fileId;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/BaseFileDTO.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/BaseFileDTO.java
index 3776c983552..deb5352bbcf 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/BaseFileDTO.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/BaseFileDTO.java
@@ -36,6 +36,10 @@ public class BaseFileDTO {
private String fullPath;
@JsonProperty("fileLen")
private long fileLen;
+ @JsonProperty("commitTime")
+ private String commitTime;
+ @JsonProperty("fileId")
+ private String fileId;
@JsonProperty("bootstrapBaseFile")
private BaseFileDTO bootstrapBaseFile;
@@ -46,13 +50,12 @@ public class BaseFileDTO {
HoodieBaseFile baseFile;
if (null != dto.fileStatus) {
- baseFile = new
HoodieBaseFile(FileStatusDTO.toFileStatus(dto.fileStatus));
+ baseFile = new
HoodieBaseFile(FileStatusDTO.toFileStatus(dto.fileStatus), dto.fileId,
dto.commitTime, toBaseFile(dto.bootstrapBaseFile));
} else {
- baseFile = new HoodieBaseFile(dto.fullPath);
+ baseFile = new HoodieBaseFile(dto.fullPath, dto.fileId, dto.commitTime,
toBaseFile(dto.bootstrapBaseFile));
baseFile.setFileLen(dto.fileLen);
}
- baseFile.setBootstrapBaseFile(toBaseFile(dto.bootstrapBaseFile));
return baseFile;
}
@@ -81,8 +84,11 @@ public class BaseFileDTO {
dto.fullPath = baseFile.getPath();
dto.fileLen = baseFile.getFileLen();
if (baseFile instanceof HoodieBaseFile) {
- dto.bootstrapBaseFile = ((HoodieBaseFile)baseFile).getBootstrapBaseFile()
+ HoodieBaseFile hoodieBaseFile = (HoodieBaseFile) baseFile;
+ dto.bootstrapBaseFile = hoodieBaseFile.getBootstrapBaseFile()
.map(BaseFileDTO::fromHoodieBaseFile).orElse(null);
+ dto.fileId = hoodieBaseFile.getFileId();
+ dto.commitTime = hoodieBaseFile.getCommitTime();
}
return dto;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FilePathDTO.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FilePathDTO.java
index 1714301d495..55dc3ef4410 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FilePathDTO.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FilePathDTO.java
@@ -18,6 +18,8 @@
package org.apache.hudi.common.table.timeline.dto;
+import org.apache.hudi.hadoop.CachingPath;
+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.hadoop.fs.Path;
@@ -49,7 +51,7 @@ public class FilePathDTO {
}
try {
- return new Path(new URI(dto.uri));
+ return new CachingPath(new URI(dto.uri));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
index 1ff93327f7f..f25737228e6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
@@ -71,7 +71,7 @@ public class HoodieTablePreCommitFileSystemView {
Map<String, HoodieBaseFile> newFilesWrittenForPartition =
filesWritten.stream()
.filter(file -> partitionStr.equals(file.getPartitionPath()))
.collect(Collectors.toMap(HoodieWriteStat::getFileId, writeStat ->
- new HoodieBaseFile(new CachingPath(tableMetaClient.getBasePath(),
writeStat.getPath()).toString())));
+ new HoodieBaseFile(new CachingPath(tableMetaClient.getBasePath(),
writeStat.getPath()).toString(), writeStat.getFileId(), preCommitInstantTime,
null)));
Stream<HoodieBaseFile> committedBaseFiles =
this.completedCommitsFileSystemView.getLatestBaseFiles(partitionStr);
Map<String, HoodieBaseFile> allFileIds = committedBaseFiles
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ExternalFilePathUtil.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ExternalFilePathUtil.java
new file mode 100644
index 00000000000..223ae8abc42
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/ExternalFilePathUtil.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+/**
+ * Utility methods for handling externally created files.
+ */
+public class ExternalFilePathUtil {
+ // Suffix acts as a marker when appended to a file path that the path was
created by an external system and not a Hudi writer.
+ private static final String EXTERNAL_FILE_SUFFIX = "_hudiext";
+
+ /**
+ * Appends the commit time and external file marker to the file path. Hudi
relies on the commit time in the file name for properly generating views of the
files in a table.
+ * @param filePath The original file path
+ * @param commitTime The time of the commit that added this file to the table
+ * @return The file path with this additional information appended
+ */
+ public static String appendCommitTimeAndExternalFileMarker(String filePath,
String commitTime) {
+ return filePath + "_" + commitTime + EXTERNAL_FILE_SUFFIX;
+ }
+
+ /**
+ * Checks if the file name was created by an external system by checking for
the external file marker at the end of the file name.
+ * @param fileName The file name
+ * @return True if the file was created by an external system, false
otherwise
+ */
+ public static boolean isExternallyCreatedFile(String fileName) {
+ return fileName.endsWith(EXTERNAL_FILE_SUFFIX);
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index f1b9249940a..8d5114a76bc 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -62,7 +62,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
-import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -177,6 +176,7 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
* You can find more details in HUDI-3834.
*/
private static final Lazy<HoodieMetadataColumnStats.Builder>
METADATA_COLUMN_STATS_BUILDER_STUB =
Lazy.lazily(HoodieMetadataColumnStats::newBuilder);
+ private static final HoodieMetadataFileInfo DELETE_FILE_METADATA = new
HoodieMetadataFileInfo(0L, true);
private String key = null;
private int type = 0;
private Map<String, HoodieMetadataFileInfo> filesystemMetadata = null;
@@ -326,25 +326,18 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
* @param filesDeleted List of files which have been deleted from this
partition
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionFilesRecord(String partition,
-
Option<Map<String, Long>> filesAdded,
-
Option<List<String>> filesDeleted) {
- Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
- filesAdded.ifPresent(filesMap ->
- fileInfo.putAll(
- filesMap.entrySet().stream().collect(
- Collectors.toMap(Map.Entry::getKey, (entry) -> {
- long fileSize = entry.getValue();
- // Assert that the file-size of the file being added is
positive, since Hudi
- // should not be creating empty files
- checkState(fileSize > 0);
- return new HoodieMetadataFileInfo(fileSize, false);
- })))
- );
- filesDeleted.ifPresent(filesList ->
- fileInfo.putAll(
- filesList.stream().collect(
- Collectors.toMap(Function.identity(), (ignored) -> new
HoodieMetadataFileInfo(0L, true))))
- );
+
Map<String, Long> filesAdded,
+
List<String> filesDeleted) {
+ int size = filesAdded.size() + filesDeleted.size();
+ Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>(size, 1);
+ filesAdded.forEach((fileName, fileSize) -> {
+ // Assert that the file-size of the file being added is positive, since
Hudi
+ // should not be creating empty files
+ checkState(fileSize > 0);
+ fileInfo.put(fileName, new HoodieMetadataFileInfo(fileSize, false));
+ });
+
+ filesDeleted.forEach(fileName -> fileInfo.put(fileName,
DELETE_FILE_METADATA));
HoodieKey key = new HoodieKey(partition,
MetadataPartitionType.FILES.getPartitionPath());
HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 201e12d312f..6be175ffa96 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -378,8 +378,8 @@ public class HoodieTableMetadataUtil {
CollectionUtils::combine);
newFileCount.add(updatedFilesToSizesMapping.size());
- return
HoodieMetadataPayload.createPartitionFilesRecord(partition,
Option.of(updatedFilesToSizesMapping),
- Option.empty());
+ return
HoodieMetadataPayload.createPartitionFilesRecord(partition,
updatedFilesToSizesMapping,
+ Collections.emptyList());
})
.collect(Collectors.toList());
@@ -507,8 +507,8 @@ public class HoodieTableMetadataUtil {
final String partition = getPartitionIdentifier(partitionName);
// Files deleted from a partition
List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
- HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
- Option.of(new ArrayList<>(deletedFiles)));
+ HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition,
Collections.emptyMap(),
+ deletedFiles);
records.add(record);
fileDeleteCount[0] += deletedFiles.size();
boolean isPartitionDeleted = partitionMetadata.getIsPartitionDeleted();
@@ -533,25 +533,19 @@ public class HoodieTableMetadataUtil {
int[] fileDeleteCount = {0};
int[] filesAddedCount = {0};
- filesAdded.forEach((k,v) -> {
- String partition = k;
- Map<String, Long> filestoAdd = v;
- filesAddedCount[0] += filestoAdd.size();
- Option<List<String>> filesToDelete = filesDeleted.containsKey(k) ?
Option.of(filesDeleted.get(k)) : Option.empty();
- if (filesToDelete.isPresent()) {
- fileDeleteCount[0] += filesToDelete.get().size();
- }
- HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition,
Option.of(filestoAdd), filesToDelete);
+ filesAdded.forEach((partition, filesToAdd) -> {
+ filesAddedCount[0] += filesToAdd.size();
+ List<String> filesToDelete = filesDeleted.getOrDefault(partition,
Collections.emptyList());
+ fileDeleteCount[0] += filesToDelete.size();
+ HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition, filesToAdd,
filesToDelete);
records.add(record);
});
// there could be partitions which only has missing deleted files.
- filesDeleted.forEach((k,v) -> {
- String partition = k;
- Option<List<String>> filesToDelete = Option.of(v);
+ filesDeleted.forEach((partition, filesToDelete) -> {
if (!filesAdded.containsKey(partition)) {
- fileDeleteCount[0] += filesToDelete.get().size();
- HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
filesToDelete);
+ fileDeleteCount[0] += filesToDelete.size();
+ HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition,
Collections.emptyMap(), filesToDelete);
records.add(record);
}
});
@@ -694,7 +688,7 @@ public class HoodieTableMetadataUtil {
Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
processRollbackMetadata(rollbackMetadata, partitionToAppendedFiles);
reAddLogFilesFromRollbackPlan(dataTableMetaClient, instantTime,
partitionToAppendedFiles);
- return convertFilesToFilesPartitionRecords(Collections.EMPTY_MAP,
partitionToAppendedFiles, instantTime, "Rollback");
+ return convertFilesToFilesPartitionRecords(Collections.emptyMap(),
partitionToAppendedFiles, instantTime, "Rollback");
}
/**
@@ -740,20 +734,20 @@ public class HoodieTableMetadataUtil {
protected static List<HoodieRecord>
convertFilesToFilesPartitionRecords(Map<String, List<String>>
partitionToDeletedFiles,
Map<String, Map<String, Long>> partitionToAppendedFiles,
String instantTime, String operation) {
- List<HoodieRecord> records = new LinkedList<>();
+ List<HoodieRecord> records = new
ArrayList<>(partitionToDeletedFiles.size() + partitionToAppendedFiles.size());
int[] fileChangeCount = {0, 0}; // deletes, appends
partitionToDeletedFiles.forEach((partitionName, deletedFiles) -> {
fileChangeCount[0] += deletedFiles.size();
final String partition = getPartitionIdentifier(partitionName);
- Option<Map<String, Long>> filesAdded = Option.empty();
+ Map<String, Long> filesAdded = Collections.emptyMap();
if (partitionToAppendedFiles.containsKey(partitionName)) {
- filesAdded = Option.of(partitionToAppendedFiles.remove(partitionName));
+ filesAdded = partitionToAppendedFiles.remove(partitionName);
}
HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded,
- Option.of(new ArrayList<>(deletedFiles)));
+ deletedFiles);
records.add(record);
});
@@ -767,8 +761,8 @@ public class HoodieTableMetadataUtil {
"Rollback file cannot both be appended and deleted");
// New files added to a partition
- HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition,
Option.of(appendedFileMap),
- Option.empty());
+ HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition, appendedFileMap,
+ Collections.emptyList());
records.add(record);
});
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java
index 15a93cd232b..0623088a9f4 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java
@@ -68,7 +68,23 @@ public class TestHoodieBaseFile {
assertFileGetters(null, hoodieBaseFile, -1, Option.of(bootstrapBaseFile));
}
+ @Test
+ void createFromExternalFileStatus() {
+ String fileName = "parquet_file_1.parquet";
+ String storedPathString = "file:/tmp/hoodie/2021/01/01/" + fileName + "_"
+ baseCommitTime + "_hudiext";
+ String expectedPathString = "file:/tmp/hoodie/2021/01/01/" + fileName;
+ FileStatus inputFileStatus = new FileStatus(length, false, 0, 0, 0, 0,
null, null, null, new Path(storedPathString));
+ FileStatus expectedFileStatus = new FileStatus(length, false, 0, 0, 0, 0,
null, null, null, new Path(expectedPathString));
+ HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(inputFileStatus);
+
+ assertFileGetters(expectedFileStatus, hoodieBaseFile, length,
Option.empty(), fileName, expectedPathString, fileName);
+ }
+
private void assertFileGetters(FileStatus fileStatus, HoodieBaseFile
hoodieBaseFile, long fileLength, Option<HoodieBaseFile> bootstrapBaseFile) {
+ assertFileGetters(fileStatus, hoodieBaseFile, fileLength,
bootstrapBaseFile, fileId, pathStr, fileName);
+ }
+
+ private void assertFileGetters(FileStatus fileStatus, HoodieBaseFile
hoodieBaseFile, long fileLength, Option<HoodieBaseFile> bootstrapBaseFile,
String fileId, String pathStr, String fileName) {
assertEquals(fileId, hoodieBaseFile.getFileId());
assertEquals(baseCommitTime, hoodieBaseFile.getCommitTime());
assertEquals(bootstrapBaseFile, hoodieBaseFile.getBootstrapBaseFile());
diff --git
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
index 12baa02d864..cde9341f5cd 100644
---
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
+++
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
@@ -51,7 +51,7 @@ public class TestHoodieMetadataPayload extends
HoodieCommonTestHarness {
);
HoodieRecord<HoodieMetadataPayload> firstPartitionFilesRecord =
- HoodieMetadataPayload.createPartitionFilesRecord(partitionName,
Option.of(firstCommitAddedFiles), Option.empty());
+ HoodieMetadataPayload.createPartitionFilesRecord(partitionName,
firstCommitAddedFiles, Collections.emptyList());
Map<String, Long> secondCommitAddedFiles = createImmutableMap(
// NOTE: This is an append
@@ -63,22 +63,20 @@ public class TestHoodieMetadataPayload extends
HoodieCommonTestHarness {
List<String> secondCommitDeletedFiles =
Collections.singletonList("file1.parquet");
HoodieRecord<HoodieMetadataPayload> secondPartitionFilesRecord =
- HoodieMetadataPayload.createPartitionFilesRecord(partitionName,
Option.of(secondCommitAddedFiles), Option.of(secondCommitDeletedFiles));
+ HoodieMetadataPayload.createPartitionFilesRecord(partitionName,
secondCommitAddedFiles, secondCommitDeletedFiles);
HoodieMetadataPayload combinedPartitionFilesRecordPayload =
secondPartitionFilesRecord.getData().preCombine(firstPartitionFilesRecord.getData());
HoodieMetadataPayload expectedCombinedPartitionedFilesRecordPayload =
HoodieMetadataPayload.createPartitionFilesRecord(partitionName,
- Option.of(
- createImmutableMap(
- Pair.of("file2.parquet", 2000L),
- Pair.of("file3.parquet", 3333L),
- Pair.of("file4.parquet", 4000L),
- Pair.of("file5.parquet", 5000L)
- )
+ createImmutableMap(
+ Pair.of("file2.parquet", 2000L),
+ Pair.of("file3.parquet", 3333L),
+ Pair.of("file4.parquet", 4000L),
+ Pair.of("file5.parquet", 5000L)
),
- Option.empty()
+ Collections.emptyList()
).getData();
assertEquals(expectedCombinedPartitionedFilesRecordPayload,
combinedPartitionFilesRecordPayload);