This is an automated email from the ASF dual-hosted git repository.

satish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e83085  [HUDI-1518] Remove the logic that delete replaced file when 
archive (#3310)
9e83085 is described below

commit 9e8308527a3a19bf58397ededfbf91a70568060e
Author: zhangyue19921010 <[email protected]>
AuthorDate: Thu Aug 12 01:54:44 2021 +0800

    [HUDI-1518] Remove the logic that delete replaced file when archive (#3310)
    
    * remove delete replaced file when archive
    
    * done
    
    * remove unsed import
    
    * remove delete replaced files when archive realted UT
    
    * code reviewed
    
    Co-authored-by: yuezhang <[email protected]>
---
 .../apache/hudi/client/ReplaceArchivalHelper.java  | 52 --------------
 .../hudi/table/HoodieTimelineArchiveLog.java       | 31 ---------
 .../hudi/io/TestHoodieTimelineArchiveLog.java      | 79 ----------------------
 3 files changed, 162 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java
index 5144f2b..40eff71 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java
@@ -20,24 +20,13 @@ package org.apache.hudi.client;
 
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieRollingStatMetadata;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.view.TableFileSystemView;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
 import java.io.Serializable;
-import java.util.List;
-import java.util.stream.Stream;
 
 /**
  * Operates on marker files for a given write action (commit, delta commit, 
compaction).
@@ -61,45 +50,4 @@ public class ReplaceArchivalHelper implements Serializable {
     
avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY,
 "");
     return avroMetaData;
   }
-
-  /**
-   * Delete all files represented by FileSlices in parallel. Return true if 
all files are deleted successfully.
-   */
-  public static boolean deleteReplacedFileGroups(HoodieEngineContext context, 
HoodieTableMetaClient metaClient,
-                                                 TableFileSystemView 
fileSystemView,
-                                                 HoodieInstant instant, 
List<String> replacedPartitions) {
-    // There is no file id to be replaced in the very first replace commit 
file for insert overwrite operation
-    if (replacedPartitions.isEmpty()) {
-      LOG.warn("Found no partition files to replace");
-      return true;
-    }
-    context.setJobStatus(ReplaceArchivalHelper.class.getSimpleName(), "Delete 
replaced file groups");
-    List<Boolean> f = context.map(replacedPartitions, partition -> {
-      Stream<FileSlice> fileSlices =  
fileSystemView.getReplacedFileGroupsBeforeOrOn(instant.getTimestamp(), 
partition)
-          .flatMap(HoodieFileGroup::getAllRawFileSlices);
-      return fileSlices.allMatch(slice -> deleteFileSlice(slice, metaClient, 
instant));
-    }, replacedPartitions.size());
-
-    return f.stream().reduce((x, y) -> x & y).orElse(true);
-  }
-
-  private static boolean deleteFileSlice(FileSlice fileSlice, 
HoodieTableMetaClient metaClient, HoodieInstant instant) {
-    boolean baseFileDeleteSuccess = fileSlice.getBaseFile().map(baseFile ->
-        deletePath(new Path(baseFile.getPath()), metaClient, 
instant)).orElse(true);
-
-    boolean logFileSuccess = fileSlice.getLogFiles().map(logFile ->
-        deletePath(logFile.getPath(), metaClient, instant)).allMatch(x -> x);
-    return baseFileDeleteSuccess & logFileSuccess;
-  }
-
-  private static boolean deletePath(Path path, HoodieTableMetaClient 
metaClient, HoodieInstant instant) {
-    try {
-      LOG.info("Deleting " + path + " before archiving " + instant);
-      metaClient.getFs().delete(path);
-      return true;
-    } catch (IOException e) {
-      LOG.error("unable to delete file groups that are replaced", e);
-      return false;
-    }
-  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
index 43698f0..11c288b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
@@ -22,13 +22,11 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
-import org.apache.hudi.client.ReplaceArchivalHelper;
 import org.apache.hudi.client.utils.MetadataConversionUtils;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieArchivedLogFile;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
-import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
@@ -40,7 +38,6 @@ import 
org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
-import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
@@ -303,11 +300,6 @@ public class HoodieTimelineArchiveLog<T extends 
HoodieAvroPayload, I, K, O> {
       LOG.info("Wrapper schema " + wrapperSchema.toString());
       List<IndexedRecord> records = new ArrayList<>();
       for (HoodieInstant hoodieInstant : instants) {
-        // TODO HUDI-1518 Cleaner now takes care of removing replaced file 
groups. This call to deleteReplacedFileGroups can be removed.
-        boolean deleteSuccess = deleteReplacedFileGroups(context, 
hoodieInstant);
-        if (!deleteSuccess) {
-          LOG.warn("Unable to delete file(s) for " + 
hoodieInstant.getFileName() + ", replaced files possibly deleted by cleaner");
-        }
         try {
           deleteAnyLeftOverMarkers(context, hoodieInstant);
           records.add(convertToAvroRecord(hoodieInstant));
@@ -334,29 +326,6 @@ public class HoodieTimelineArchiveLog<T extends 
HoodieAvroPayload, I, K, O> {
     }
   }
 
-  private boolean deleteReplacedFileGroups(HoodieEngineContext context, 
HoodieInstant instant) {
-    if (!instant.isCompleted() || 
!HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
-      // only delete files for completed replace instants
-      return true;
-    }
-
-    TableFileSystemView fileSystemView = this.table.getFileSystemView();
-    List<String> replacedPartitions = getReplacedPartitions(instant);
-    return ReplaceArchivalHelper.deleteReplacedFileGroups(context, metaClient, 
fileSystemView, instant, replacedPartitions);
-  }
-
-  private List<String> getReplacedPartitions(HoodieInstant instant) {
-    try {
-      HoodieReplaceCommitMetadata metadata = 
HoodieReplaceCommitMetadata.fromBytes(
-          metaClient.getActiveTimeline().getInstantDetails(instant).get(),
-          HoodieReplaceCommitMetadata.class);
-
-      return new ArrayList<>(metadata.getPartitionToReplaceFileIds().keySet());
-    } catch (IOException e) {
-      throw new HoodieCommitException("Failed to archive because cannot delete 
replace files", e);
-    }
-  }
-
   private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) 
throws Exception {
     if (records.size() > 0) {
       Map<HeaderMetadataType, String> header = new HashMap<>();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
index ea80b08..cdd3fa5 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
@@ -21,15 +21,12 @@ package org.apache.hudi.io;
 import org.apache.hudi.avro.model.HoodieActionInstant;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.client.utils.MetadataConversionUtils;
 import org.apache.hudi.common.HoodieCleanStat;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -50,7 +47,6 @@ import org.apache.hudi.table.HoodieTimelineArchiveLog;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -234,45 +230,6 @@ public class TestHoodieTimelineArchiveLog extends 
HoodieClientTestHarness {
   }
 
   @Test
-  public void testArchiveTableWithReplacedFiles() throws Exception {
-    HoodieTestUtils.init(hadoopConf, basePath);
-    HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
-        
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 
2).forTable("test-trip-table")
-        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
 3).build())
-        .build();
-
-    // when using insert_overwrite or insert_overwrite_table
-    // first commit may without replaceFileIds
-    createReplaceMetadataWithoutReplaceFileId("000");
-
-    int numCommits = 4;
-    int commitInstant = 100;
-    for (int i = 0; i < numCommits; i++) {
-      createReplaceMetadata(String.valueOf(commitInstant));
-      commitInstant += 100;
-    }
-
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
-    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
-    assertEquals(5, timeline.countInstants(), "Loaded 5 commits and the count 
should match");
-    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
table);
-    boolean result = archiveLog.archiveIfRequired(context);
-    assertTrue(result);
-
-    FileStatus[] allFiles = metaClient.getFs().listStatus(new Path(basePath + 
"/" + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH));
-    Set<String> allFileIds = Arrays.stream(allFiles).map(fs -> 
FSUtils.getFileIdFromFilePath(fs.getPath())).collect(Collectors.toSet());
-
-    // verify 100-1,200-1 are deleted by archival
-    assertFalse(allFileIds.contains("file-100-1"));
-    assertFalse(allFileIds.contains("file-200-1"));
-    assertTrue(allFileIds.contains("file-100-2"));
-    assertTrue(allFileIds.contains("file-200-2"));
-    assertTrue(allFileIds.contains("file-300-1"));
-    assertTrue(allFileIds.contains("file-400-1"));
-  }
-
-  @Test
   public void testArchiveTableWithNoArchival() throws IOException {
     HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
         
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 
2).forTable("test-trip-table")
@@ -679,42 +636,6 @@ public class TestHoodieTimelineArchiveLog extends 
HoodieClientTestHarness {
     assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, 
notArchivedInstant2, notArchivedInstant3), "");
   }
 
-  private void createReplaceMetadataWithoutReplaceFileId(String instantTime) 
throws Exception {
-
-    // create replace instant without a previous replace commit
-    HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
HoodieRequestedReplaceMetadata.newBuilder()
-        .setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE.toString())
-        .setVersion(1)
-        .setExtraMetadata(Collections.emptyMap())
-        .build();
-    HoodieReplaceCommitMetadata completeReplaceMetadata = new 
HoodieReplaceCommitMetadata();
-    HoodieCommitMetadata inflightReplaceMetadata = new HoodieCommitMetadata();
-    
completeReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE);
-    
inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE);
-    HoodieTestTable.of(metaClient)
-        .addReplaceCommit(instantTime, Option.of(requestedReplaceMetadata), 
Option.of(inflightReplaceMetadata), completeReplaceMetadata);
-  }
-
-  private void createReplaceMetadata(String instantTime) throws Exception {
-    String fileId1 = "file-" + instantTime + "-1";
-    String fileId2 = "file-" + instantTime + "-2";
-
-    // create replace instant to mark fileId1 as deleted
-    HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
HoodieRequestedReplaceMetadata.newBuilder()
-        .setOperationType(WriteOperationType.INSERT_OVERWRITE.toString())
-        .setVersion(1)
-        .setExtraMetadata(Collections.emptyMap())
-        .build();
-    HoodieReplaceCommitMetadata completeReplaceMetadata = new 
HoodieReplaceCommitMetadata();
-    HoodieCommitMetadata inflightReplaceMetadata = new HoodieCommitMetadata();
-    
completeReplaceMetadata.addReplaceFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
 fileId1);
-    
completeReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
-    
inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
-    HoodieTestTable.of(metaClient)
-        .addReplaceCommit(instantTime, Option.of(requestedReplaceMetadata), 
Option.of(inflightReplaceMetadata), completeReplaceMetadata)
-        
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
-  }
-
   private HoodieInstant createCleanMetadata(String instantTime, boolean 
inflightOnly) throws IOException {
     HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new 
HoodieActionInstant("", "", ""), "", new HashMap<>(),
         CleanPlanV2MigrationHandler.VERSION, new HashMap<>());

Reply via email to