This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c4a6632  [HUDI-1651] Fix archival of requested replacecommit (#2622)
c4a6632 is described below

commit c4a66324cdd3e289e0bf18bdd150b95ee6e4c66c
Author: satishkotha <[email protected]>
AuthorDate: Tue Mar 9 15:56:44 2021 -0800

    [HUDI-1651] Fix archival of requested replacecommit (#2622)
---
 .../hudi/table/HoodieTimelineArchiveLog.java       | 11 +++++--
 .../hudi/io/TestHoodieTimelineArchiveLog.java      |  8 ++++-
 .../java/org/apache/hudi/table/TestCleaner.java    | 37 ++++++++++++++++++----
 hudi-common/pom.xml                                |  2 +-
 .../src/main/avro/HoodieArchivedMetaEntry.avsc     |  8 +++++
 .../hudi/common/testutils/FileCreateUtils.java     |  7 ++--
 .../hudi/common/testutils/HoodieTestTable.java     |  5 +--
 7 files changed, 63 insertions(+), 15 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
index cae5dbb..8efd3a2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
@@ -400,9 +400,14 @@ public class HoodieTimelineArchiveLog<T extends 
HoodieAvroPayload, I, K, O> {
         break;
       }
       case HoodieTimeline.REPLACE_COMMIT_ACTION: {
-        HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata
-            .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), 
HoodieReplaceCommitMetadata.class);
-        
archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata));
+        if (hoodieInstant.isRequested()) {
+          archivedMetaWrapper.setHoodieRequestedReplaceMetadata(
+              
TimelineMetadataUtils.deserializeRequestedReplaceMetadata(commitTimeline.getInstantDetails(hoodieInstant).get()));
+        } else if (hoodieInstant.isCompleted()) {
+          HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata
+              
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), 
HoodieReplaceCommitMetadata.class);
+          
archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata));
+        }
         archivedMetaWrapper.setActionType(ActionType.replacecommit.name());
         break;
       }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
index 60f605c..042013d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io;
 import org.apache.hudi.avro.model.HoodieActionInstant;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.common.HoodieCleanStat;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
@@ -495,11 +496,16 @@ public class TestHoodieTimelineArchiveLog extends 
HoodieClientTestHarness {
     String fileId2 = "file-" + instantTime + "-2";
 
     // create replace instant to mark fileId1 as deleted
+    HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
HoodieRequestedReplaceMetadata.newBuilder()
+        .setOperationType(WriteOperationType.INSERT_OVERWRITE.toString())
+        .setVersion(1)
+        .setExtraMetadata(Collections.emptyMap())
+        .build();
     HoodieReplaceCommitMetadata replaceMetadata = new 
HoodieReplaceCommitMetadata();
     
replaceMetadata.addReplaceFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
 fileId1);
     replaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
     HoodieTestTable.of(metaClient)
-        .addReplaceCommit(instantTime, replaceMetadata)
+        .addReplaceCommit(instantTime, requestedReplaceMetadata, 
replaceMetadata)
         
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 1119f26..fd578bd 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -24,9 +24,14 @@ import org.apache.hudi.avro.model.HoodieActionInstant;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieClusteringStrategy;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSliceInfo;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.HoodieCleanStat;
@@ -845,7 +850,8 @@ public class TestCleaner extends HoodieClientTestBase {
     // make next replacecommit, with 1 clustering operation. logically delete 
p0. No change to p1
     Map<String, String> partitionAndFileId002 = 
testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0);
     String file2P0C1 = partitionAndFileId002.get(p0);
-    testTable.addReplaceCommit("00000000000002", 
generateReplaceCommitMetadata(p0, file1P0C0, file2P0C1));
+    Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> 
replaceMetadata = generateReplaceCommitMetadata(p0, file1P0C0, file2P0C1);
+    testTable.addReplaceCommit("00000000000002", replaceMetadata.getKey(), 
replaceMetadata.getValue());
 
     // run cleaner
     List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config);
@@ -857,7 +863,8 @@ public class TestCleaner extends HoodieClientTestBase {
     // make next replacecommit, with 1 clustering operation. Replace data in 
p1. No change to p0
     Map<String, String> partitionAndFileId003 = 
testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1);
     String file3P1C2 = partitionAndFileId003.get(p1);
-    testTable.addReplaceCommit("00000000000003", 
generateReplaceCommitMetadata(p1, file1P1C0, file3P1C2));
+    replaceMetadata = generateReplaceCommitMetadata(p1, file1P1C0, file3P1C2);
+    testTable.addReplaceCommit("00000000000003", replaceMetadata.getKey(), 
replaceMetadata.getValue());
 
     // run cleaner
     List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config);
@@ -870,7 +877,8 @@ public class TestCleaner extends HoodieClientTestBase {
     // make next replacecommit, with 1 clustering operation. Replace data in 
p0 again
     Map<String, String> partitionAndFileId004 = 
testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0);
     String file4P0C3 = partitionAndFileId004.get(p0);
-    testTable.addReplaceCommit("00000000000004", 
generateReplaceCommitMetadata(p0, file2P0C1, file4P0C3));
+    replaceMetadata = generateReplaceCommitMetadata(p0, file2P0C1, file4P0C3);
+    testTable.addReplaceCommit("00000000000004", replaceMetadata.getKey(), 
replaceMetadata.getValue());
 
     // run cleaner
     List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
@@ -884,7 +892,8 @@ public class TestCleaner extends HoodieClientTestBase {
     // make next replacecommit, with 1 clustering operation. Replace all data 
in p1. no new files created
     Map<String, String> partitionAndFileId005 = 
testTable.forReplaceCommit("00000000000005").getFileIdsWithBaseFilesInPartitions(p1);
     String file4P1C4 = partitionAndFileId005.get(p1);
-    testTable.addReplaceCommit("00000000000005", 
generateReplaceCommitMetadata(p1, file3P1C2, file4P1C4));
+    replaceMetadata = generateReplaceCommitMetadata(p0, file3P1C2, file4P1C4);
+    testTable.addReplaceCommit("00000000000005", replaceMetadata.getKey(), 
replaceMetadata.getValue());
     
     List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, 2);
     assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
@@ -894,7 +903,23 @@ public class TestCleaner extends HoodieClientTestBase {
     assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
   }
   
-  private HoodieReplaceCommitMetadata generateReplaceCommitMetadata(String 
partition, String replacedFileId, String newFileId) {
+  private Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> 
generateReplaceCommitMetadata(String partition,
+                                                                               
                           String replacedFileId,
+                                                                               
                           String newFileId) {
+    HoodieRequestedReplaceMetadata requestedReplaceMetadata = new 
HoodieRequestedReplaceMetadata();
+    
requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.toString());
+    requestedReplaceMetadata.setVersion(1);
+    HoodieSliceInfo sliceInfo = 
HoodieSliceInfo.newBuilder().setFileId(replacedFileId).build();
+    List<HoodieClusteringGroup> clusteringGroups = new ArrayList<>();
+    clusteringGroups.add(HoodieClusteringGroup.newBuilder()
+        
.setVersion(1).setNumOutputFileGroups(1).setMetrics(Collections.emptyMap())
+        .setSlices(Collections.singletonList(sliceInfo)).build());
+    requestedReplaceMetadata.setExtraMetadata(Collections.emptyMap());
+    
requestedReplaceMetadata.setClusteringPlan(HoodieClusteringPlan.newBuilder()
+        .setVersion(1).setExtraMetadata(Collections.emptyMap())
+        
.setStrategy(HoodieClusteringStrategy.newBuilder().setStrategyClassName("").setVersion(1).build())
+        .setInputGroups(clusteringGroups).build());
+    
     HoodieReplaceCommitMetadata replaceMetadata = new 
HoodieReplaceCommitMetadata();
     replaceMetadata.addReplaceFileId(partition, replacedFileId);
     replaceMetadata.setOperationType(WriteOperationType.CLUSTER);
@@ -905,7 +930,7 @@ public class TestCleaner extends HoodieClientTestBase {
       writeStat.setFileId(newFileId);
       replaceMetadata.addWriteStat(partition, writeStat);
     }
-    return replaceMetadata;
+    return Pair.of(requestedReplaceMetadata, replaceMetadata);
   }
 
   @Test
diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index 41588ca..af69631 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -77,7 +77,6 @@
             
<import>${basedir}/src/main/avro/HoodieRollbackMetadata.avsc</import>
             
<import>${basedir}/src/main/avro/HoodieRestoreMetadata.avsc</import>
             
<import>${basedir}/src/main/avro/HoodieReplaceCommitMetadata.avsc</import>
-            
<import>${basedir}/src/main/avro/HoodieArchivedMetaEntry.avsc</import>
             <import>${basedir}/src/main/avro/HoodiePath.avsc</import>
             <import>${basedir}/src/main/avro/HoodieFSPermission.avsc</import>
             <import>${basedir}/src/main/avro/HoodieFileStatus.avsc</import>
@@ -90,6 +89,7 @@
             <import>${basedir}/src/main/avro/HoodieClusteringPlan.avsc</import>
             
<import>${basedir}/src/main/avro/HoodieRequestedReplaceMetadata.avsc</import>
             <import>${basedir}/src/main/avro/HoodieMetadata.avsc</import>
+            
<import>${basedir}/src/main/avro/HoodieArchivedMetaEntry.avsc</import>
           </imports>
         </configuration>
       </plugin>
diff --git a/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc 
b/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc
index c68ef87..60be522 100644
--- a/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc
+++ b/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc
@@ -104,6 +104,14 @@
             "HoodieReplaceCommitMetadata"
          ],
          "default": null
+      },
+      {
+         "name":"hoodieRequestedReplaceMetadata",
+         "type":[
+            "null",
+            "HoodieRequestedReplaceMetadata"
+         ],
+         "default": null
       }
    ]
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index 4beba35..5f03ef0 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.testutils;
 
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
@@ -28,6 +29,7 @@ import 
org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.exception.HoodieException;
@@ -147,8 +149,9 @@ public class FileCreateUtils {
     createMetaFile(basePath, instantTime, 
HoodieTimeline.REPLACE_COMMIT_EXTENSION, 
metadata.toJsonString().getBytes(StandardCharsets.UTF_8));
   }
 
-  public static void createRequestedReplaceCommit(String basePath, String 
instantTime) throws IOException {
-    createMetaFile(basePath, instantTime, 
HoodieTimeline.REQUESTED_REPLACE_COMMIT_EXTENSION);
+  public static void createRequestedReplaceCommit(String basePath, String 
instantTime, HoodieRequestedReplaceMetadata requestedMetadata) throws 
IOException {
+    createMetaFile(basePath, instantTime, 
HoodieTimeline.REQUESTED_REPLACE_COMMIT_EXTENSION,
+        
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedMetadata).get());
   }
 
   public static void createInflightReplaceCommit(String basePath, String 
instantTime) throws IOException {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 858e113..46ef14f 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -22,6 +22,7 @@ package org.apache.hudi.common.testutils;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
@@ -163,8 +164,8 @@ public class HoodieTestTable {
     return this;
   }
 
-  public HoodieTestTable addReplaceCommit(String instantTime, 
HoodieReplaceCommitMetadata metadata) throws Exception {
-    createRequestedReplaceCommit(basePath, instantTime);
+  public HoodieTestTable addReplaceCommit(String instantTime, 
HoodieRequestedReplaceMetadata requestedReplaceMetadata, 
HoodieReplaceCommitMetadata metadata) throws Exception {
+    createRequestedReplaceCommit(basePath, instantTime, 
requestedReplaceMetadata);
     createInflightReplaceCommit(basePath, instantTime);
     createReplaceCommit(basePath, instantTime, metadata);
     currentInstantTime = instantTime;

Reply via email to