This is an automated email from the ASF dual-hosted git repository.
satish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9e83085 [HUDI-1518] Remove the logic that delete replaced file when
archive (#3310)
9e83085 is described below
commit 9e8308527a3a19bf58397ededfbf91a70568060e
Author: zhangyue19921010 <[email protected]>
AuthorDate: Thu Aug 12 01:54:44 2021 +0800
[HUDI-1518] Remove the logic that delete replaced file when archive (#3310)
* remove delete replaced file when archive
* done
* remove unsed import
* remove delete replaced files when archive realted UT
* code reviewed
Co-authored-by: yuezhang <[email protected]>
---
.../apache/hudi/client/ReplaceArchivalHelper.java | 52 --------------
.../hudi/table/HoodieTimelineArchiveLog.java | 31 ---------
.../hudi/io/TestHoodieTimelineArchiveLog.java | 79 ----------------------
3 files changed, 162 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java
index 5144f2b..40eff71 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java
@@ -20,24 +20,13 @@ package org.apache.hudi.client;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import java.io.IOException;
import java.io.Serializable;
-import java.util.List;
-import java.util.stream.Stream;
/**
* Operates on marker files for a given write action (commit, delta commit,
compaction).
@@ -61,45 +50,4 @@ public class ReplaceArchivalHelper implements Serializable {
avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY,
"");
return avroMetaData;
}
-
- /**
- * Delete all files represented by FileSlices in parallel. Return true if
all files are deleted successfully.
- */
- public static boolean deleteReplacedFileGroups(HoodieEngineContext context,
HoodieTableMetaClient metaClient,
- TableFileSystemView
fileSystemView,
- HoodieInstant instant,
List<String> replacedPartitions) {
- // There is no file id to be replaced in the very first replace commit
file for insert overwrite operation
- if (replacedPartitions.isEmpty()) {
- LOG.warn("Found no partition files to replace");
- return true;
- }
- context.setJobStatus(ReplaceArchivalHelper.class.getSimpleName(), "Delete
replaced file groups");
- List<Boolean> f = context.map(replacedPartitions, partition -> {
- Stream<FileSlice> fileSlices =
fileSystemView.getReplacedFileGroupsBeforeOrOn(instant.getTimestamp(),
partition)
- .flatMap(HoodieFileGroup::getAllRawFileSlices);
- return fileSlices.allMatch(slice -> deleteFileSlice(slice, metaClient,
instant));
- }, replacedPartitions.size());
-
- return f.stream().reduce((x, y) -> x & y).orElse(true);
- }
-
- private static boolean deleteFileSlice(FileSlice fileSlice,
HoodieTableMetaClient metaClient, HoodieInstant instant) {
- boolean baseFileDeleteSuccess = fileSlice.getBaseFile().map(baseFile ->
- deletePath(new Path(baseFile.getPath()), metaClient,
instant)).orElse(true);
-
- boolean logFileSuccess = fileSlice.getLogFiles().map(logFile ->
- deletePath(logFile.getPath(), metaClient, instant)).allMatch(x -> x);
- return baseFileDeleteSuccess & logFileSuccess;
- }
-
- private static boolean deletePath(Path path, HoodieTableMetaClient
metaClient, HoodieInstant instant) {
- try {
- LOG.info("Deleting " + path + " before archiving " + instant);
- metaClient.getFs().delete(path);
- return true;
- } catch (IOException e) {
- LOG.error("unable to delete file groups that are replaced", e);
- return false;
- }
- }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
index 43698f0..11c288b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
@@ -22,13 +22,11 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
-import org.apache.hudi.client.ReplaceArchivalHelper;
import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
-import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
@@ -40,7 +38,6 @@ import
org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
-import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
@@ -303,11 +300,6 @@ public class HoodieTimelineArchiveLog<T extends
HoodieAvroPayload, I, K, O> {
LOG.info("Wrapper schema " + wrapperSchema.toString());
List<IndexedRecord> records = new ArrayList<>();
for (HoodieInstant hoodieInstant : instants) {
- // TODO HUDI-1518 Cleaner now takes care of removing replaced file
groups. This call to deleteReplacedFileGroups can be removed.
- boolean deleteSuccess = deleteReplacedFileGroups(context,
hoodieInstant);
- if (!deleteSuccess) {
- LOG.warn("Unable to delete file(s) for " +
hoodieInstant.getFileName() + ", replaced files possibly deleted by cleaner");
- }
try {
deleteAnyLeftOverMarkers(context, hoodieInstant);
records.add(convertToAvroRecord(hoodieInstant));
@@ -334,29 +326,6 @@ public class HoodieTimelineArchiveLog<T extends
HoodieAvroPayload, I, K, O> {
}
}
- private boolean deleteReplacedFileGroups(HoodieEngineContext context,
HoodieInstant instant) {
- if (!instant.isCompleted() ||
!HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
- // only delete files for completed replace instants
- return true;
- }
-
- TableFileSystemView fileSystemView = this.table.getFileSystemView();
- List<String> replacedPartitions = getReplacedPartitions(instant);
- return ReplaceArchivalHelper.deleteReplacedFileGroups(context, metaClient,
fileSystemView, instant, replacedPartitions);
- }
-
- private List<String> getReplacedPartitions(HoodieInstant instant) {
- try {
- HoodieReplaceCommitMetadata metadata =
HoodieReplaceCommitMetadata.fromBytes(
- metaClient.getActiveTimeline().getInstantDetails(instant).get(),
- HoodieReplaceCommitMetadata.class);
-
- return new ArrayList<>(metadata.getPartitionToReplaceFileIds().keySet());
- } catch (IOException e) {
- throw new HoodieCommitException("Failed to archive because cannot delete
replace files", e);
- }
- }
-
private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records)
throws Exception {
if (records.size() > 0) {
Map<HeaderMetadataType, String> header = new HashMap<>();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
index ea80b08..cdd3fa5 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
@@ -21,15 +21,12 @@ package org.apache.hudi.io;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.HoodieCleanStat;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -50,7 +47,6 @@ import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -234,45 +230,6 @@ public class TestHoodieTimelineArchiveLog extends
HoodieClientTestHarness {
}
@Test
- public void testArchiveTableWithReplacedFiles() throws Exception {
- HoodieTestUtils.init(hadoopConf, basePath);
- HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
-
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2,
2).forTable("test-trip-table")
-
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
3).build())
- .build();
-
- // when using insert_overwrite or insert_overwrite_table
- // first commit may without replaceFileIds
- createReplaceMetadataWithoutReplaceFileId("000");
-
- int numCommits = 4;
- int commitInstant = 100;
- for (int i = 0; i < numCommits; i++) {
- createReplaceMetadata(String.valueOf(commitInstant));
- commitInstant += 100;
- }
-
- metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
- HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
- assertEquals(5, timeline.countInstants(), "Loaded 5 commits and the count
should match");
- HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg,
table);
- boolean result = archiveLog.archiveIfRequired(context);
- assertTrue(result);
-
- FileStatus[] allFiles = metaClient.getFs().listStatus(new Path(basePath +
"/" + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH));
- Set<String> allFileIds = Arrays.stream(allFiles).map(fs ->
FSUtils.getFileIdFromFilePath(fs.getPath())).collect(Collectors.toSet());
-
- // verify 100-1,200-1 are deleted by archival
- assertFalse(allFileIds.contains("file-100-1"));
- assertFalse(allFileIds.contains("file-200-1"));
- assertTrue(allFileIds.contains("file-100-2"));
- assertTrue(allFileIds.contains("file-200-2"));
- assertTrue(allFileIds.contains("file-300-1"));
- assertTrue(allFileIds.contains("file-400-1"));
- }
-
- @Test
public void testArchiveTableWithNoArchival() throws IOException {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2,
2).forTable("test-trip-table")
@@ -679,42 +636,6 @@ public class TestHoodieTimelineArchiveLog extends
HoodieClientTestHarness {
assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1,
notArchivedInstant2, notArchivedInstant3), "");
}
- private void createReplaceMetadataWithoutReplaceFileId(String instantTime)
throws Exception {
-
- // create replace instant without a previous replace commit
- HoodieRequestedReplaceMetadata requestedReplaceMetadata =
HoodieRequestedReplaceMetadata.newBuilder()
- .setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE.toString())
- .setVersion(1)
- .setExtraMetadata(Collections.emptyMap())
- .build();
- HoodieReplaceCommitMetadata completeReplaceMetadata = new
HoodieReplaceCommitMetadata();
- HoodieCommitMetadata inflightReplaceMetadata = new HoodieCommitMetadata();
-
completeReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE);
-
inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE);
- HoodieTestTable.of(metaClient)
- .addReplaceCommit(instantTime, Option.of(requestedReplaceMetadata),
Option.of(inflightReplaceMetadata), completeReplaceMetadata);
- }
-
- private void createReplaceMetadata(String instantTime) throws Exception {
- String fileId1 = "file-" + instantTime + "-1";
- String fileId2 = "file-" + instantTime + "-2";
-
- // create replace instant to mark fileId1 as deleted
- HoodieRequestedReplaceMetadata requestedReplaceMetadata =
HoodieRequestedReplaceMetadata.newBuilder()
- .setOperationType(WriteOperationType.INSERT_OVERWRITE.toString())
- .setVersion(1)
- .setExtraMetadata(Collections.emptyMap())
- .build();
- HoodieReplaceCommitMetadata completeReplaceMetadata = new
HoodieReplaceCommitMetadata();
- HoodieCommitMetadata inflightReplaceMetadata = new HoodieCommitMetadata();
-
completeReplaceMetadata.addReplaceFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1);
-
completeReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
-
inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
- HoodieTestTable.of(metaClient)
- .addReplaceCommit(instantTime, Option.of(requestedReplaceMetadata),
Option.of(inflightReplaceMetadata), completeReplaceMetadata)
-
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
- }
-
private HoodieInstant createCleanMetadata(String instantTime, boolean
inflightOnly) throws IOException {
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new
HoodieActionInstant("", "", ""), "", new HashMap<>(),
CleanPlanV2MigrationHandler.VERSION, new HashMap<>());